Spark에서 원래 json 코드와 파싱된(분류된) 데이터를 한번에 보고 싶다면 아래와 같은 코드를 참조하길 바란다.



val schema = StructType(

List( StructField("year", StringType, nullable = true), StructField("month", StringType, nullable = true), StructField("day", StringType, nullable = true) ) ) val ds = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers")) .option("startingOffsets", "latest") .option("key.deserializer", "classOf[StringDeserializer]") .option("value.deserializer", "classOf[StringDeserializer]") .option("subscribe", config.getString(s"kafka.$phase.topic.name")) .load() .selectExpr("CAST(value AS STRING)") .select(from_json($"value", schema).as("data"), col("value").cast("string")) .select("data.*", "value") .as[(String, String, String, String)]



Posted by '김용환'
,