Spark에서 원래 json 코드와 파싱된(분류된) 데이터를 한번에 보고 싶다면 아래와 같은 코드를 참조하길 바란다.



val schema = StructType(

List( StructField("year", StringType, nullable = true), StructField("month", StringType, nullable = true), StructField("day", StringType, nullable = true) ) ) val ds = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers")) .option("startingOffsets", "latest") .option("key.deserializer", "classOf[StringDeserializer]") .option("value.deserializer", "classOf[StringDeserializer]") .option("subscribe", config.getString(s"kafka.$phase.topic.name")) .load() .selectExpr("CAST(value AS STRING)") .select(from_json($"value", schema).as("data"), col("value").cast("string")) .select("data.*", "value") .as[(String, String, String, String)]



Posted by '김용환'
,



spark streaming을 처리할 때 Encoder를 잘 이해하지 못하면,  아래 에러를 많이 만나게 된다.



Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.




단순히 Serializable 이슈라 하기에는 좀..

spark을 더 공부할 수 있는 꺼리가 할 수 있다.


DataFrame 및 DataSet에 대한 이해도를 높일 수 있다.



https://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row



https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Encoder.html




https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html



Posted by '김용환'
,