스파크 스트리밍 처리할 때 누산기(accumulator) 같이 처리해야 할 때가 있다. 


아래 예시는 처리해야 할 offset을 모두 더하는(누산기) 기능이다. 잘 동작한다.



  var totalLag: Long = 0


  def printLag(rdd: RDD[ConsumerRecord[String, String]]): Unit = {

    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.foreachPartition { iter =>

      val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)

      totalLag += o.count()

    }

    println(s"******************total lag : $totalLag")

    totalLag = 0

  }



Posted by '김용환'
,