Spark Structed Streaming 코드로 읽는 부분에 대한 예시




kafka에서 json만 value로 받는다. 


json도 필드로, json에 있는 json 값의 내용도 필드로 꺼집어 내서 DataSet으로 구성하는 예이다.






즉, 


json 필드는 log 컬럼로,


json 필드의 tag 값은 tag 컬럼으로,


json 필드의 @timestamp 값은 @timestamp 컬럼으로,


json 필드의 uuid는 uuid 컬럼으로 생성해, 총 4개의 컬럼으로 구성하는 예시이다. 




    val schema = StructType(

      List(

        StructField("tag", StringType, nullable = true),

        StructField("@timestamp", StringType, nullable = true),

        StructField("uuid", StringType, nullable = true)

      )

    )


    val ds = spark.readStream.format("kafka")

      .option("kafka.bootstrap.servers", config.getString(s"kafka.$phase.brokers"))

      .option("startingOffsets", "latest")

      .option("key.deserializer", "classOf[StringDeserializer]")

      .option("value.deserializer", "classOf[StringDeserializer]")

      .option("subscribe", config.getString(s"kafka.$phase.topic.name"))

      .load()

      .selectExpr("CAST(value AS STRING)")

      .select(from_json($"value", schema).as("data"), col("value").cast("string").as("log"))

      .select("data.*", "log")

      .withColumnRenamed("tag", "tag")

      .withColumnRenamed("@timestamp", "timestamp")

      .withColumnRenamed("uuid", "uuid")

      .as[(String, String, String, String)]









기초 지식 참고

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/structured-streaming-kafka-integration.html


Posted by '김용환'
,


spark 2.x의 spark structured streaming 예시로 다음 코드를 참고하는 것이 좋다.


아래 코드는 카프카와 카산드라 연동 내용이다.


https://github.com/ansrivas/spark-structured-streaming/blob/master/src/main/scala/com/kafkaToSparkToCass/Main.scala





class SparkJob extends Serializable {

  @transient lazy val logger = Logger.getLogger(this.getClass)


  logger.setLevel(Level.INFO)

  val sparkSession =

    SparkSession.builder

      .master("local[*]")

      .appName("kafka2Spark2Cassandra")

      .config("spark.cassandra.connection.host", "localhost")

      .getOrCreate()


  val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)


  // Create keyspace and tables here, NOT in prod

  connector.withSessionDo { session =>

    Statements.createKeySpaceAndTable(session, true)

  }


  private def processRow(value: Commons.UserEvent) = {

    connector.withSessionDo { session =>

      session.execute(Statements.cql(value.user_id, value.time, value.event))

    }

  }


  def runJob() = {


    logger.info("Execution started with following configuration")

    val cols = List("user_id", "time", "event")


    import sparkSession.implicits._

    val lines = sparkSession.readStream

      .format("kafka")

      .option("subscribe", "test.1")

      .option("kafka.bootstrap.servers", "localhost:9092")

      .option("startingOffsets", "earliest")

      .load()

      .selectExpr("CAST(value AS STRING)",

                  "CAST(topic as STRING)",

                  "CAST(partition as INTEGER)")

      .as[(String, String, Integer)]


    val df =

      lines.map { line =>

        val columns = line._1.split(";") // value being sent out as a comma separated value "userid_1;2015-05-01T00:00:00;some_value"

        (columns(0), Commons.getTimeStamp(columns(1)), columns(2))

      }.toDF(cols: _*)


    df.printSchema()


    // Run your business logic here

    val ds = df.select($"user_id", $"time", $"event").as[Commons.UserEvent]


    // This Foreach sink writer writes the output to cassandra.

    import org.apache.spark.sql.ForeachWriter

    val writer = new ForeachWriter[Commons.UserEvent] {

      override def open(partitionId: Long, version: Long) = true

      override def process(value: Commons.UserEvent) = {

        processRow(value)

      }

      override def close(errorOrNull: Throwable) = {}

    }


    val query =

      ds.writeStream.queryName("kafka2Spark2Cassandra").foreach(writer).start


    query.awaitTermination()

    sparkSession.stop()

  }


Posted by '김용환'
,