NIFI를 처음 대하는 Devops를 위해서 남겨둔다. 



tail을 이용해 local cassadnra에 저장하는 예제이다. 

예제 데이터는 다음과 같다. 


tail-cassandra.xml



cassandra 스키마는 다음과 같이 진행했다. NIFI의 ExecuteScript에서 Groovy를 사용할 수 있으나. 너무 지저분한 코드에.. 간단하게 생성한다.

CREATE TABLE log.recent_logs (

    service_tag text,

    ts timestamp,

    uuid text,

    log text,

    PRIMARY KEY (service_tag, ts, uuid)

) WITH CLUSTERING ORDER BY (ts DESC, uuid ASC)

    AND bloom_filter_fp_chance = 0.01

    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}

    AND comment = ''

    AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}

    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}

    AND crc_check_chance = 1.0

    AND dclocal_read_repair_chance = 0.1

    AND default_time_to_live = 0

    AND gc_grace_seconds = 864000

    AND max_index_interval = 2048

    AND memtable_flush_period_in_ms = 0

    AND min_index_interval = 128

    AND read_repair_chance = 0.0

    AND speculative_retry = '99PERCENTILE';



NIFI 구성은 다음과 같다. 한건 보냈고 한건 잘 받았다는 내용이다. 이 부분을 잘 기억해두는게 좋다. 초반에 동작이 되는지 안되는지를 정확하게 몰라서 헤맸다..;;;




 log 파일을 cassandra에 저장하는 예이다. 


tail 로그로 저장할 파일은 다음과 같다.


{  "pid": "12329", "severity": "INFO", "ident": "openstack.nova.compute.resource_tracker", "message": "[req-cfb4dd79-3bd2-4d3b-833d-3763264edbfd - - - - -] Compute_service record updated for google-seoul-com033:google-seoulcom033.google.io", "hostname": "google-seoul-com033", "node": "nova_compute", "type": "nova_compute", "phase": "seoul", "timestamp": "2017-12-01T20:38:28+09:00", "uuid": "5e919297-ff3e-4574-83fc-0b4cc69945f4", "crow_svc_tag": "public.krane", "service_tag": "google.openstack"  }



* 순서는 다음과 같다. 


로그를 json으로 받는다 ->

ExtractText Processor를 사용해 log에 전체 로그((.*))  저장한다. (.*) 중요함!!! ->

EvaluateJsonPath Processor를 사용해 json 로그의 일부 데이터를 저장한다. uuid 키에 $.uuid 를 저장한다

NIFI 내부에서는 https://github.com/json-path/JsonPath를 사용한다.(다만 전체 json 데이터를 저장 못하니. 

ExtractText를 사용해야 한다)  ->


CQL을 생성하기 위해 ReplaceText Processor를 사용한다.  -> 


INSERT INTO log.recent_logs (service_tag, ts, uuid, log) VALUES ('${service_tag}', ${ts}, '${uuid}', '${log}') USING TTL 3600;



데이터는 cassandra에 저장된다. 




* 주의할 점이 많다.



1. Processor는 모두 실행(Start) 가능한 상태지만, start를 시킬 수 없으면 먼가 잘못한 것이다.(가장 난관이었음)



2. Processor에서 작업한 결과를 property로 남기면 해당 그룹내에서 전역으로 사용할 수 있다. 


3. ExtractText → json 모두 ((.*)로 설정해야 함)


4. EvaluateJsonPath → json 일부 데이터를 jsonPath를 이용해 $.service_tags 이렇게 . (EvaluateJsonPath에서는 json 통 데이터를 파싱할 수 없습니다. flowfile-attribute를 사용한 것을 유의해야 한다)


5. PutCassandraQL은 그냥 저장 위치와 keyspace만 정하는 컴포넌트라서, 해당 데이터를 모두 merge하기 위해 ReplaceText를 사용해 cassandra query(CQL)를 생성한다. 이는 스토리지 컴포넌트 모두에 해당된다



6. 원래 Processor의 결과(아래 Processor의 경우 failure, matched, unmatched) 모두를 다른 Processor에서 처리할 수 있게 해야 한다. 그러나 matched만 처리하게 나머지는 처리하고 싶지 않다면 Settings/Automatically Terminate Relationships를 체크온(checkon)하면 Processor는 동작 가능 상태(Start 아이콘 생김)가 나타난다.






7. 제대로 들어오는지 디버깅하려면 마지막 PutCassandraQL을 중지 시키면. 큐에 쌓인 데이터를 볼 수 있다. 그걸로 자세히 볼 수 있다. 


8. Processor에서 처리를 하지 못하면 Processor의 오른쪽 위에 빨간색 박스가 나타나서 간단한 에러를 볼 수 있다




* 공부 자료


1. Processor 에 도움되는 개념.
https://community.hortonworks.com/articles/57803/using-nifi-gettwitter-updateattributes-and-replace.html


2. 각종 예제 

https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates

(한 번  해봐야 이해한다)


3. stack overflow와 hortonworks community를 최대한 활용하기.



'Cloud' 카테고리의 다른 글

NIFI의 provenance 의 drop event  (0) 2017.12.12
NIFI 팁  (0) 2017.12.08
Bad neighbors/Noisy neighbors  (0) 2017.11.15
ha_proxy 인증서 pem 파일  (0) 2017.11.10
[링크] 오픈스택 firewall  (0) 2017.11.08
Posted by '김용환'
,