백프레셔(Backpressure) – 스트리밍 작업을 처리하다가 폭발적인 데이터(예, 사건/사고, 이벤트)가 발생하면 처리 시스템은 폭발적인 데이터를 우아하게 처리할 수 있어야 한다. 


처리 시간이 배치 간격보다 커지면 다음 배치 잡에서는 지연이 생기고 불안정해 진다. 따라서 불안정 상태가 지속되면 백프레셔에 의해 입력율(input rate)를 줄여 처리량과 처리 시간을 줄인다. 따라서 지연이 0이 될 것이다. 


폭발적인 데이터가 갑자기 카프카에 저장되어 스파크 스트리밍의 카프카 컨슈머에 리턴하는 배치 크기를 제한하고 싶을 수 있다. 이럴 때 스파크 스트리밍 백프레셔를 적용할 수 있다. (이는 대부분의 스트리밍 처리 플랫폼(예, storm, flink)에서 제공된다.)



spark.streaming.backpressure.enabled와 spark.streaming.backpressure.initialRate를 사용하면 된다. 

spark.streaming.backpressure.initialRate 기본값은 not set이고,  spark.streaming.backpressure.enabled 기본값은  disabled이다. 



https://spark.apache.org/docs/latest/configuration.html#spark-streaming 설정에 잘 설명되어 있다.


스파크 스트리밍은 spark.streaming.backpressure.enabled를 통해 현재 배치 스케줄링 지연과 처리 시간을 기준으로 수신 속도를 제어할 수 있기 때문에 시스템이 최대한 빠르게 처리할 수 있다. 내부적으로는 수신자의 최대 수신 속도가 동적으로 설정된다. 이 속도는 spark.streaming.receiver.maxRate와 spark.streaming.kafka.maxRatePerPartition 상한 값으로 설정된다.


첫 번째 배치를 제어하거나 좀 더 구체적으로 첫 번째 배치의 메시지 수를 설정하고 싶다면, spark.streaming.backpressure.initialRate을 사용할 수 있다. spark.streaming.backpressure.initialRate은 백프레셔 메커니즘이 활성화(spark.streaming.backpressure.enabled=true)되었을 때 각 리시버가 첫번째 배치에 대한 데이터를 수신하는 최대 수신 속도이다. 


spark.streaming.kafka.maxRatePerPartition의 기본값은 not set인고, 카프카 direct stream API를 사용할 때 각 카프카 파티션에서 데이터를 읽을 최대 속도(초당 레코드 수)으로 설정한다.



예)

spark.streaming.kafka.maxRatePerPartition = "100"

spark.streaming.backpressure.enabled = "true"






Posted by '김용환'
,

[펌] kafka burrow api

kafka 2018. 11. 20. 14:46


카프카 api stat, metric 지표를 보기 위한 burrow(https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint)라는 오픈 소스 툴이 있다.



kafka에서 사용하고 싶은 http endpoint를 보고 호출한다.


https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint#request-endpoints


예)

http://burrow.google.io:8000/v3/kafka/mycluster/topic/logs

{"error":false,"message":"topic offsets returned","offsets":[152223734586,152224559773,152224774276,152224723888,152224644847,152224641838,152224508250,152224383547,152215033727,152215053018,152214434045,152214530227,152215253175,152214990582,152215431432,152213601661,152215273301,152215092394,152214795862,152215123946,152215194674,152215391037,152214877453,152215137734,152215680569,152215360097,152214928462,152215484025,152214933673,152214661665,152214049830,152215021533,152215748420,152214945335,152215126831,152215051384,152214863230,152214966710,152215634739,152214820473,152215165668,152215071434,152214866458,152214865355,152214934334,152214662023,152214830751,152214573022,152215197587,152214836785],"request":{"url":"/v3/kafka/mycluster/topic/logs","host":"burrow-google"}}







Posted by '김용환'
,


apache phoenix 에서 timestamp 타입을 utc로 저장했다.



이를 현재 시간(gmt +9)로 보고 싶다면 아래와 같은 쿼리를 실행한다.




select CONVERT_TZ(timestamp, 'UTC', 'Asia/Seoul')




기타 phoenix 함수는 아래를 참조한다.


https://phoenix.apache.org/language/functions.html


Posted by '김용환'
,