스파크 스트리밍 처리할 때 누산기(accumulator) 같이 처리해야 할 때가 있다.
아래 예시는 처리해야 할 offset을 모두 더하는(누산기) 기능이다. 잘 동작한다.
var totalLag: Long = 0
def printLag(rdd: RDD[ConsumerRecord[String, String]]): Unit = {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
totalLag += o.count()
}
println(s"******************total lag : $totalLag")
totalLag = 0
}