spark streaming 테스르를 진행할 때 보통은 카프카나 파일을 읽는 것부터 시작하는데.
이번에는 메모리가 있나 해서 보니. 메모리가 있다.
producer 코드는 단순히 쓰레드로 만드는 코드를 추가하고
이를 spark streaming 에서 쉽게 처리하는 간단한 예제가 있다.
package streaming
import org.apache.spark.streaming.StreamingContext
import streaming.KafkaAvroDemo.INTERVAL
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.joda.time.DateTime
object MemoryStreamDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.config("spark.master", "local[2]")
.getOrCreate()
val SLEEP_TIME = 3000L
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, INTERVAL)
val inputStream = new MemoryStream[Int](1, spark.sqlContext)
new Thread(new Runnable {
override def run() = {
while (true) {
inputStream.addData(1, 2, 3, 4, 5)
Thread.sleep(SLEEP_TIME)
}
}
}).start()
val stream = inputStream.toDS().toDF("number")
val query = stream.writeStream.trigger(Trigger.ProcessingTime(SLEEP_TIME))
.foreachBatch((dataset, batchId) => {
dataset.foreachPartition(rows => {
rows.foreach(row => {
println(new DateTime())
println(s"batch : ${batchId} , value : ${row}")
})
})
})
.start()
query.awaitTermination(20000L)
}
}
결과는 다음과 같다.
batch : 0 , value : [1]
batch : 0 , value : [2]
batch : 0 , value : [3]
batch : 0 , value : [4]
batch : 0 , value : [5]
batch : 0 , value : [1]
batch : 0 , value : [2]
batch : 0 , value : [3]
batch : 0 , value : [4]
batch : 0 , value : [5]
....