spark streaming 테스르를 진행할 때 보통은 카프카나 파일을 읽는 것부터 시작하는데.

이번에는 메모리가 있나 해서 보니. 메모리가 있다. 

 

producer 코드는 단순히 쓰레드로 만드는 코드를 추가하고

이를 spark streaming 에서 쉽게 처리하는 간단한 예제가 있다. 

 

package streaming

import org.apache.spark.streaming.StreamingContext
import streaming.KafkaAvroDemo.INTERVAL
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.joda.time.DateTime

object MemoryStreamDemo {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .config("spark.master", "local[2]")
      .getOrCreate()


    val SLEEP_TIME = 3000L

    import spark.implicits._

    val ssc = new StreamingContext(spark.sparkContext, INTERVAL)

    val inputStream = new MemoryStream[Int](1, spark.sqlContext)

    new Thread(new Runnable {
      override def run() = {
        while (true) {
          inputStream.addData(1, 2, 3, 4, 5)
          Thread.sleep(SLEEP_TIME)
        }
      }
    }).start()

    val stream = inputStream.toDS().toDF("number")

    val query = stream.writeStream.trigger(Trigger.ProcessingTime(SLEEP_TIME))
      .foreachBatch((dataset, batchId) => {
        dataset.foreachPartition(rows => {
          rows.foreach(row => {
            println(new DateTime())
            println(s"batch : ${batchId} , value : ${row}")
          })
        })
      })
      .start()

    query.awaitTermination(20000L)
  }
}

 

결과는 다음과 같다.

 

 

batch : 0 , value : [1]
batch : 0 , value : [2]
batch : 0 , value : [3]
batch : 0 , value : [4]
batch : 0 , value : [5]

 

batch : 0 , value : [1]
batch : 0 , value : [2]
batch : 0 , value : [3]
batch : 0 , value : [4]
batch : 0 , value : [5]

 

....

 

Posted by '김용환'
,