Spark Structed Streaming 코드로 읽는 부분에 대한 예시




kafka에서 json만 value로 받는다. 


json도 필드로, json에 있는 json 값의 내용도 필드로 꺼집어 내서 DataSet으로 구성하는 예이다.






즉, 


json 필드는 log 컬럼로,


json 필드의 tag 값은 tag 컬럼으로,


json 필드의 @timestamp 값은 @timestamp 컬럼으로,


json 필드의 uuid는 uuid 컬럼으로 생성해, 총 4개의 컬럼으로 구성하는 예시이다. 




    val schema = StructType(

      List(

        StructField("tag", StringType, nullable = true),

        StructField("@timestamp", StringType, nullable = true),

        StructField("uuid", StringType, nullable = true)

      )

    )


    val ds = spark.readStream.format("kafka")

      .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers"))

      .option("startingOffsets", "latest")

      .option("key.deserializer", "classOf[StringDeserializer]")

      .option("value.deserializer", "classOf[StringDeserializer]")

      .option("subscribe", config.getString(s"kafka.$phase.topic.name"))

      .load()

      .selectExpr("CAST(value AS STRING)")

      .select(from_json($"value", schema).as("data"), col("value").cast("string").as("log"))

      .select("data.*", "log")

      .withColumnRenamed("tag", "tag")

      .withColumnRenamed("@timestamp", "timestamp")

      .withColumnRenamed("uuid", "uuid")

      .as[(String, String, String, String)]









기초 지식 참고

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/structured-streaming-kafka-integration.html


Posted by '김용환'
,