NIFI 프로세서에서 마지막 프로세스의 provenance history를 보면 event type을 보다가 DROP이라는 것이 눈에 띄여 조사를 해보았다.


DROP 이벤트 타입의 경우 Detail을 보면 Auto-Terminated by success Relationship으로 나타난다.



관련된 내용은 다음을 참고한다. 


https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html



NIFI의 Data Provenance 기능은 수신된 각 데이터에서 무슨 일이 발생했는지 정확히 알 수 있다 FlowFile이 수신된 시간, 수정된 시간, 특정 방식으로 라우팅된 시간, 전송 시간 등을 보여주는 그래프를 제공한다. 또한 그래프에서 이벤트가 발생한 이유를 알 수 있다. 


이를 구분하는 단계로 Event Type이 구분하는 역할을 진행한다. 


DROP 타입은 FlowFile의 끝을 의미하는 것이고 Detail에 따라 성공했기 때문에 자동 종료했음을 나타내는 의미한다. 


 

'Cloud' 카테고리의 다른 글

[fluentd]의 fluent-plugin-forest(forest)  (0) 2018.01.22
[fluentd] filter/record에 예약어 존재  (0) 2017.12.18
NIFI의 provenance 의 drop event  (0) 2017.12.12
NIFI 팁  (0) 2017.12.08
[nifi] tail -> cassandra 저장 예제  (0) 2017.12.01
Bad neighbors/Noisy neighbors  (0) 2017.11.15
Posted by 김용환 '김용환'

NIFI 팁

Cloud 2017.12.08 10:59


NIFI 공부에 도움되는 문서


https://nifi.apache.org/docs/nifi-docs/html/user-guide.html


https://www.slideshare.net/KojiKawamura/apache-nifi-10-in-nutshell


https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#Reporting_Tasks





* nifi 1.4 (최신) 설치


https://nifi.apache.org/download.html



* 설정

미리 zk 설정되야 한다. 



<nifi.properties 수정>



nifi.web.http.host=장비 이름


nifi.cluster.is.node=true

nifi.cluster.node.address=장비 이름



nifi.zookeeper.connect.string=zk서버들

nifi.zookeeper.connect.timeout=3 secs

nifi.zookeeper.session.timeout=3 secs

nifi.zookeeper.root.node=/samuel-nifi




<bootstrap.conf 수정>


# JVM memory settings

java.arg.2=-Xms16g

java.arg.3=-Xmx16g





* 시작


5대를 클러스터링할 수 있도록 설정을 수정하니 약 3분 정도 소요된다. (완전 좋은 장비를 이용했다)

클러스터링 시작 시간은 3분(디폴트)이며 수정가능하다. 수정한 내용은 뒤에서 설명..




$ bin/nifi.sh start


Java home: /usr/java/default

NiFi home: /home/deploy/nifi-1.4.0


Bootstrap Config File: /home/deploy/nifi-1.4.0/conf/bootstrap.conf




* 확인하기 


$ bin/nifi.sh  status


Java home: /usr/java/default

NiFi home: /home/deploy/nifi-1.4.0


Bootstrap Config File: /home/deploy/nifi-1.4.0/conf/bootstrap.conf


2017-12-07 15:30:32,753 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is currently running, listening to Bootstrap on port 57300, PID=60012


로그(logs)를 보면서 정상적으로 동작하는 지 확인할 수 있다. 




* 중지/재시작 후 


$ bin/nifi.sh stop

$ bin/nifi.sh start





* 설정 수정 


1. master election 시간은 5 mins이라서 수정하는 것이 좋다.


nifi.cluster.flow.election.max.wait.time=1 mins



2. nifi 웹에서의 디폴트 타임아웃이 응근히 짧다. 5초인데.. single web application으로 구현된 UI라 좀 걸릴 수 있어서 느리게 해도 된다.


nifi.cluster.node.connection.timeout=30 sec

nifi.cluster.node.read.timeout=30 sec




3. 그룹 Thread 디폴트 값은 1이라서 그룹 Thread 크기를 늘리는 것이 좋다. 


Maximum Timer Driven Thread Count 

Maximum Event Driven Thread Count 





4. 커보로스 파일 설정은 다음과 같다. 


nifi.kerberos.krb5.file=/etc/krb5.conf



5. 재시작시 자동 플레이(autoresume)이 안되게 한다.

빅 데이터 처리로 이미 클러스터링이 깨진 상태에서 


nifi.flowcontroller.autoResumeState=false



 


* 주의 할 점


1. 

하둡 관련된 부분을 수정하면 nifi 전체 재시작을 해야 한다. 바로 동작되지 않는다. 




2.

클러스터링이 끊어진 상태에서 특정 서버에서만 설정을 바꾼 경우.. 다

서버 상태는 Status라고 나오지만,, 다음과 같은 에러가 발생할 수 있다. 



2017-12-07 19:22:40,170 ERROR [main] o.a.nifi.controller.StandardFlowService Failed to load flow from cluster due to: org.apache.nifi.controller.UninheritableFlowException: Failed to connect node to cluster because local flow is different than cluster flow.

org.apache.nifi.controller.UninheritableFlowException: Failed to connect node to cluster because local flow is different than cluster flow.

at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:937)



문제된 서버에 클러스터링된 conf/flow.xml.gz을 삭제하고 다른 서버의 conf/flow.xml.gz을 복사한 후, 재시작하면 잘 연결된다. 


nifi.properties에 설정 파일을 가르킬 수 있다. 

nifi.flow.configuration.file=./conf/flow.xml.gz



3. 


스토리지가 죽으면 nifi는 열리지 않는다.. bottleneck은 스토리지이다. 최대 개수의 효과를 누리고 싶지만 잘못하고 connection 수가 너무 많아 storage 죽을 수 있다는 점을 명심할 필요가 있다. 



4.  

nifi web이 동작하지 못할 정도로 문제가 되어 강제 재시작하면 다음 에러가 발생할 수 있다. 


2017-12-07 20:43:38,006 ERROR [NiFi logging handler] org.apache.nifi.StdErr

2017-12-07 20:43:38,007 ERROR [NiFi logging handler] org.apache.nifi.StdErr Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "logback-1"

2017-12-07 20:44:32,213 ERROR [NiFi logging handler] org.apache.nifi.StdErr

2017-12-07 20:44:32,213 ERROR [NiFi logging handler] org.apache.nifi.StdErr Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "cluster1-timeouter-0"

2017-12-07 20:44:56,169 ERROR [NiFi logging handler] org.apache.nifi.StdErr

2017-12-07 20:44:56,169 ERROR [NiFi logging handler] org.apache.nifi.StdErr Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Listen to Bootstrap"

2017-12-07 20:45:08,175 ERROR [NiFi logging handler] org.apache.nifi.StdErr

2017-12-07 20:45:08,175 ERROR [NiFi logging handler] org.apache.nifi.StdErr Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Provenance Maintenance Thread-2"

2017-12-07 20:46:08,291 ERROR [NiFi logging handler] org.apache.nifi.StdErr

2017-12-07 20:46:08,292 ERROR [NiFi logging handler] org.apache.nifi.StdErr Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "StandardProcessScheduler Thread-7"

2017-12-07 20:46:14,292 ERROR [NiFi logging handler] org.apache.nifi.StdErr

2017-12-07 20:46:14,293 ERROR [NiFi logging handler] org.apache.nifi.StdErr Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Provenance Maintenance Thread-3"

2017-12-07 20:47:14,481 ERROR [NiFi logging handler] org.apache.nifi.StdErr

2017-12-07 20:47:14,482 ERROR [NiFi logging handler] org.apache.nifi.StdErr Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Timer-Driven Process Thread-37"




그래도 안되면 재시작하고.. 다시 재시작하면 된다.

그러면서 NIFI는 split brain (split network)이 생기기도 하니.. 

최대한 NIFI를 죽지 않게 잘 운영하는 것이 중요한 부분이 되지 않을까 생각되었다. cpu 부하가 50%를 넘게 하지 않는 운영의 묘가 필요한 것 같다. 




5. 모니터링 정보는 다음과 같다. 


https://nifi.apache.org/docs/nifi-docs/rest-api/

 

https://community.hortonworks.com/questions/69004/nifi-monitoring-processor-and-nifi-service.html


'Cloud' 카테고리의 다른 글

[fluentd] filter/record에 예약어 존재  (0) 2017.12.18
NIFI의 provenance 의 drop event  (0) 2017.12.12
NIFI 팁  (0) 2017.12.08
[nifi] tail -> cassandra 저장 예제  (0) 2017.12.01
Bad neighbors/Noisy neighbors  (0) 2017.11.15
ha_proxy 인증서 pem 파일  (0) 2017.11.10
Posted by 김용환 '김용환'



NIFI를 처음 대하는 Devops를 위해서 남겨둔다. 



tail을 이용해 local cassadnra에 저장하는 예제이다. 

예제 데이터는 다음과 같다. 


tail-cassandra.xml



cassandra 스키마는 다음과 같이 진행했다. NIFI의 ExecuteScript에서 Groovy를 사용할 수 있으나. 너무 지저분한 코드에.. 간단하게 생성한다.

CREATE TABLE log.recent_logs (

    service_tag text,

    ts timestamp,

    uuid text,

    log text,

    PRIMARY KEY (service_tag, ts, uuid)

) WITH CLUSTERING ORDER BY (ts DESC, uuid ASC)

    AND bloom_filter_fp_chance = 0.01

    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}

    AND comment = ''

    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}

    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}

    AND crc_check_chance = 1.0

    AND dclocal_read_repair_chance = 0.1

    AND default_time_to_live = 0

    AND gc_grace_seconds = 864000

    AND max_index_interval = 2048

    AND memtable_flush_period_in_ms = 0

    AND min_index_interval = 128

    AND read_repair_chance = 0.0

    AND speculative_retry = '99PERCENTILE';



NIFI 구성은 다음과 같다. 한건 보냈고 한건 잘 받았다는 내용이다. 이 부분을 잘 기억해두는게 좋다. 초반에 동작이 되는지 안되는지를 정확하게 몰라서 헤맸다..;;;




 log 파일을 cassandra에 저장하는 예이다. 


tail 로그로 저장할 파일은 다음과 같다.


{  "pid": "12329", "severity": "INFO", "ident": "openstack.nova.compute.resource_tracker", "message": "[req-cfb4dd79-3bd2-4d3b-833d-3763264edbfd - - - - -] Compute_service record updated for google-seoul-com033:google-seoulcom033.google.io", "hostname": "google-seoul-com033", "node": "nova_compute", "type": "nova_compute", "phase": "seoul", "timestamp": "2017-12-01T20:38:28+09:00", "uuid": "5e919297-ff3e-4574-83fc-0b4cc69945f4", "crow_svc_tag": "public.krane", "service_tag": "google.openstack"  }



* 순서는 다음과 같다. 


로그를 json으로 받는다 ->

ExtractText Processor를 사용해 log에 전체 로그((.*))  저장한다. (.*) 중요함!!! ->

EvaluateJsonPath Processor를 사용해 json 로그의 일부 데이터를 저장한다. uuid 키에 $.uuid 를 저장한다

NIFI 내부에서는 https://github.com/json-path/JsonPath를 사용한다.(다만 전체 json 데이터를 저장 못하니. 

ExtractText를 사용해야 한다)  ->


CQL을 생성하기 위해 ReplaceText Processor를 사용한다.  -> 


INSERT INTO log.recent_logs (service_tag, ts, uuid, log) VALUES ('${service_tag}', ${ts}, '${uuid}', '${log}') USING TTL 3600;



데이터는 cassandra에 저장된다. 




* 주의할 점이 많다.



1. Processor는 모두 실행(Start) 가능한 상태지만, start를 시킬 수 없으면 먼가 잘못한 것이다.(가장 난관이었음)



2. Processor에서 작업한 결과를 property로 남기면 해당 그룹내에서 전역으로 사용할 수 있다. 


3. ExtractText → json 모두 ((.*)로 설정해야 함)


4. EvaluateJsonPath → json 일부 데이터를 jsonPath를 이용해 $.service_tags 이렇게 . (EvaluateJsonPath에서는 json 통 데이터를 파싱할 수 없습니다. flowfile-attribute를 사용한 것을 유의해야 한다)


5. PutCassandraQL은 그냥 저장 위치와 keyspace만 정하는 컴포넌트라서, 해당 데이터를 모두 merge하기 위해 ReplaceText를 사용해 cassandra query(CQL)를 생성한다. 이는 스토리지 컴포넌트 모두에 해당된다



6. 원래 Processor의 결과(아래 Processor의 경우 failure, matched, unmatched) 모두를 다른 Processor에서 처리할 수 있게 해야 한다. 그러나 matched만 처리하게 나머지는 처리하고 싶지 않다면 Settings/Automatically Terminate Relationships를 체크온(checkon)하면 Processor는 동작 가능 상태(Start 아이콘 생김)가 나타난다.






7. 제대로 들어오는지 디버깅하려면 마지막 PutCassandraQL을 중지 시키면. 큐에 쌓인 데이터를 볼 수 있다. 그걸로 자세히 볼 수 있다. 


8. Processor에서 처리를 하지 못하면 Processor의 오른쪽 위에 빨간색 박스가 나타나서 간단한 에러를 볼 수 있다




* 공부 자료


1. Processor 에 도움되는 개념.
https://community.hortonworks.com/articles/57803/using-nifi-gettwitter-updateattributes-and-replace.html


2. 각종 예제 

https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates

(한 번  해봐야 이해한다)


3. stack overflow와 hortonworks community를 최대한 활용하기.



'Cloud' 카테고리의 다른 글

NIFI의 provenance 의 drop event  (0) 2017.12.12
NIFI 팁  (0) 2017.12.08
[nifi] tail -> cassandra 저장 예제  (0) 2017.12.01
Bad neighbors/Noisy neighbors  (0) 2017.11.15
ha_proxy 인증서 pem 파일  (0) 2017.11.10
[링크] 오픈스택 firewall  (0) 2017.11.08
Posted by 김용환 '김용환'



클라우드 환경에서는 

특정 물리 장비(pm)에서 동작하는 여러 vm 중 특정 vm이 자원을 많이 사용함으로서 다른 vm이 자원을 사용하지 못하는 현상을 bad neighbor 또는 noisy neighbor라고 한다.



http://searchcloudcomputing.techtarget.com/definition/noisy-neighbor-cloud-computing-performance


http://www.clubcloudcomputing.com/2012/06/bad-neighbors-in-the-cloud/




자원 독점을 qos로 막을 수 있다. 




http://events.linuxfoundation.org/sites/events/files/slides/Achieving%20QoS%20in%20Server%20Virtualization.pdf


'Cloud' 카테고리의 다른 글

NIFI 팁  (0) 2017.12.08
[nifi] tail -> cassandra 저장 예제  (0) 2017.12.01
Bad neighbors/Noisy neighbors  (0) 2017.11.15
ha_proxy 인증서 pem 파일  (0) 2017.11.10
[링크] 오픈스택 firewall  (0) 2017.11.08
우버의 스트리밍 분석 플랫폼 - AthenaX  (0) 2017.11.01
Posted by 김용환 '김용환'



ha_proxy 인증서 파일은 하나로 합쳐서 사용한다.


인증서(crt)

체인키(chainca.crt)

개인키(key)


https://www.securesign.kr/guides/HAProxy-SSL-Certificates-Install

Posted by 김용환 '김용환'


neutron 레벨

https://wiki.openstack.org/wiki/Neutron/FWaaS


ovs 레벨

https://docs.openstack.org/newton/networking-guide/config-ovsfwdriver.html



Posted by 김용환 '김용환'



우버(uber)에서 apache flink와 apache calcite 기반으로 SQL 기반의 스트리밍 분석 플랫폼(athenaX)을 내어놓았다.


https://eng.uber.com/athenax/


https://github.com/uber/AthenaX







잼난 부분을 소개하면 다음과 같다.


우버 개발자가 apache flink에 윈도우 관련 내용과 complex 타입을 지원하도록 PR을 보냈다. 기여했다.

https://issues.apache.org/jira/browse/FLINK-6377

https://github.com/apache/flink/pull/3665










기타 : 볼만한 내용


https://data-artisans.com/blog/session-windowing-in-flink



Streaming SQL from Julian Hyde




Introduction to Apache Calcite from Jordan Halterman




apache phoenix에서 apche calcite로 넘어가는 일은 얼마 안남은 듯.. 

Posted by 김용환 '김용환'


VM 생성 때 못한 작업에 대한 후처리

1) youtube cloud init example


2) documentation

Automating Openstack with cloud init run a script on VM's first boot


https://raymii.org/s/tutorials/Automating_Openstack_with_Cloud_init_run_a_script_on_VMs_first_boot.html




3) example


https://arnesund.com/2015/02/05/how-to-use-cloud-init-to-customize-new-openstack-vms/

Posted by 김용환 '김용환'



https://platform9.com/blog/kubernetes-vs-mesos-marathon/





Kubernetes offers significant advantages over Mesos + Marathon for three reasons:

  • Much wider adoption by the DevOps and containers community
  • Better scheduling options for pods, useful for complex application stacks
  • Based on over a decade of experience at Google

However, Kubernetes has been known to be difficult to deploy and manage. Platform9’s Managed Kubernetes product can fill this gap by letting organizations focus on deploying microservices on Kubernetes, instead of managing and upgrading a highly available Kubernetes deployment themselves. Further details on these and other deployment models for Kubernetes can be found in The Ultimate Guide to Deploy Kubernetes.

Posted by 김용환 '김용환'



fluentd에서 [error]: exec failed to emit error="queue size exceeds limit" error_class="Fluent::BufferQueueLimitError" tag="google.plus" 이라는 에러가 발생한다면,

버퍼 설정이 어떻게 되어 있는지 확인한다. 



type설정만 forward로 되어 있다.



type forward 

flush_interval 10s



forward 설정이 기본 memory기반이고 기본적으로 큐 크기가 64, 청크 크기는 8mb(큐에 데이터 쌓는 사이즈)인데,

flush 간격 10초 동안 생성된 로그가 큐 크기를 넘어섰다. 



https://docs.fluentd.org/v0.12/articles/out_forward#buffer_queue_limit,-buffer_chunk_limit


buffer_queue_limit, buffer_chunk_limit

The length of the chunk queue and the size of each chunk, respectively. Please see the Buffer Plugin Overview article for the basic buffer structure. The default values are 64 and 8m, respectively. The suffixes “k” (KB), “m” (MB), and “g” (GB) can be used for buffer_chunk_limit.



memory의 제약 사항이 있기 때문에 임시파일을 생성해서 bulk로 flush하는 형태를 사용하면 된다. 



  buffer_type file

  buffer_path /var/log/td-agent/buffer/fluentd.buffer

  buffer_chunk_limit 16m

  buffer_queue_limit 128

  

  

*** 

buffer_type을 주지 않으면 메모리 기반이고, 저장 용량이 작기 때문에 로그 내용이 drop 될 수 있다. 


Posted by 김용환 '김용환'