apache storm은 apache zookeeper를 필요로한다. 




* apache zookeeper 설치와 실행

http://www.apache.org/dyn/closer.cgi/zookeeper/



$ ./zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper-3.4.10/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED






* apache storm 설치 


http://storm.apache.org/downloads.html에서 설치 후 /usr/local로 옮긴다.


(archieves는 http://archive.apache.org/dist/storm/에 존재한다. 1.0.1로 테스트한다.)



설정을 변경해 local zookeeper를 바라보도록 한다. 


$ vi conf/storm.yaml


storm.zookeeper.servers:

      - localhost

nimbus.seeds: ["localhost"]





master node인 nimbus를 실행한다. 



$ ./bin/storm nimbus

Running: /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/bin/java -server -Ddaemon.name=nimbus .... org.apache.storm.daemon.nimbus




supervisor 노드를 실행한다.


$ ./bin/storm supervisor

Running: /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/bin/java -server -Ddaemon.name=supervisor... org.apache.storm.daemon.supervisor




admin ui를 위해 ui도 실행한다.


$ ./bin/storm ui

Running: /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/bin/java -server -Ddaemon.name=ui ... org.apache.storm.ui.core



웹 브라우져에서 http://localhost:8080에 접속해서 정상적으로 접근되는지 확인한다.




데몬을 확인한다. 

$ ps -ef | grep apache-storm


3개의 데몬 nimbus, ui, supervisor가 제대로 떠 있는지 확인할 수 있다. 


  


테스트를 위해 wordcount topology를 submit 한다. 


$ ./bin/storm jar ./examples/storm-starter/storm-starter-topologies-1.0.1.jar org.apache.storm.starter.WordCountTopology wordcount

Running: /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/apache-storm-1.0.1 -Dstorm.log.dir=/usr/local/apache-storm-1.0.1/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/apache-storm-1.0.1/lib/asm-5.0.3.jar:/usr/local/apache-storm-1.0.1/lib/clojure-1.7.0.jar:/usr/local/apache-storm-1.0.1/lib/disruptor-3.3.2.jar:/usr/local/apache-storm-1.0.1/lib/kryo-3.0.3.jar:/usr/local/apache-storm-1.0.1/lib/log4j-api-2.1.jar:/usr/local/apache-storm-1.0.1/lib/log4j-core-2.1.jar:/usr/local/apache-storm-1.0.1/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/apache-storm-1.0.1/lib/log4j-slf4j-impl-2.1.jar:/usr/local/apache-storm-1.0.1/lib/minlog-1.3.0.jar:/usr/local/apache-storm-1.0.1/lib/objenesis-2.1.jar:/usr/local/apache-storm-1.0.1/lib/reflectasm-1.10.1.jar:/usr/local/apache-storm-1.0.1/lib/servlet-api-2.5.jar:/usr/local/apache-storm-1.0.1/lib/slf4j-api-1.7.7.jar:/usr/local/apache-storm-1.0.1/lib/storm-core-1.0.1.jar:/usr/local/apache-storm-1.0.1/lib/storm-rename-hack-1.0.1.jar:./examples/storm-starter/storm-starter-topologies-1.0.1.jar:/usr/local/apache-storm-1.0.1/conf:/usr/local/apache-storm-1.0.1/bin -Dstorm.jar=./examples/storm-starter/storm-starter-topologies-1.0.1.jar org.apache.storm.starter.WordCountTopology wordcount

534  [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8806961472207405752:-5043320726578157353

585  [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []

640  [main] INFO  o.a.s.StormSubmitter - Uploading topology jar ./examples/storm-starter/storm-starter-topologies-1.0.1.jar to assigned location: /usr/local/apache-storm-1.0.1/storm-local/nimbus/inbox/stormjar-609f1ee2-82e5-4111-bba2-c36b830e0b15.jar

Start uploading file './examples/storm-starter/storm-starter-topologies-1.0.1.jar' to '/usr/local/apache-storm-1.0.1/storm-local/nimbus/inbox/stormjar-609f1ee2-82e5-4111-bba2-c36b830e0b15.jar' (62432746 bytes)

[==================================================] 62432746 / 62432746

File './examples/storm-starter/storm-starter-topologies-1.0.1.jar' uploaded to '/usr/local/apache-storm-1.0.1/storm-local/nimbus/inbox/stormjar-609f1ee2-82e5-4111-bba2-c36b830e0b15.jar' (62432746 bytes)

935  [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/apache-storm-1.0.1/storm-local/nimbus/inbox/stormjar-609f1ee2-82e5-4111-bba2-c36b830e0b15.jar

936  [main] INFO  o.a.s.StormSubmitter - Submitting topology wordcount in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8806961472207405752:-5043320726578157353","topology.workers":3,"topology.debug":true}

1196 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: wordcount


job업로드하고 잘 실행되는지 확인할 수 있다.




실제 소스 디렉토리는 https://github.com/apache/storm/tree/1.0.x-branch/examples/storm-starter에 있다.




main 메소드는 아래 링크에 있다. 


https://github.com/apache/storm/blob/1.0.x-branch/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java


로컬 테스트를 위해 LocalCluster를 사용하고 있고 Spout -> Bolt("split")-> Bolt("count")의 토롤리지를 구성하고 있다. 




  public static void main(String[] args) throws Exception {


    TopologyBuilder builder = new TopologyBuilder();


    builder.setSpout("spout", new RandomSentenceSpout(), 5);


    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");

    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));


    Config conf = new Config();

    conf.setDebug(true);


    if (args != null && args.length > 0) {

      conf.setNumWorkers(3);


      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

    }

    else {

      conf.setMaxTaskParallelism(3);


      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("word-count", conf, builder.createTopology());


      Thread.sleep(10000);


      cluster.shutdown();

    }

  }




데몬을 확인하면 3개의 주요 데몬 외에.. 6개가 더 떠 있는 것을 확인할 수 있다. (주목할 점은 6700~6702!!)


$ ps -ef | grep apache-storm


  ...  org.apache.storm.daemon.worker wordcount-1-1503398931 6dd4cbd3-1f43-4f17-a90f-b470fddbccbc 6700 28fae52d-9d91-4de8-9130-fe1dff2f2487

... org.apache.storm.daemon.worker wordcount-1-1503398931 6dd4cbd3-1f43-4f17-a90f-b470fddbccbc 6701 8b53b715-0184-4639-b33f-68208cc7d2e3

...  org.apache.storm.daemon.worker wordcount-1-1503398931 6dd4cbd3-1f43-4f17-a90f-b470fddbccbc 6702 e8357b79-dd08-4d70-b4de-2bf69598eadf

비슷한 것 3개가 또 있음 

 


 


제대로 동작했는지 웹 브라우져에서 살펴볼 수 있다. 


http://localhost:8080/topology.html?id=wordcount-1-1503398931

  



http://localhost:8080/api/v1/topology/summary을 호출해 api summary로 확인할 수 있다.


$ curl http://localhost:8080/api/v1/topology/summary

{"topologies":[{"assignedTotalMem":2496.0,"owner":"samuel.kim","requestedMemOnHeap":0.0,"encodedId":"wordcount-1-1503398931","assignedMemOnHeap":2496.0,"id":"wordcount-1-1503398931","uptime":"11m 28s","schedulerInfo":null,"name":"wordcount","workersTotal":3,"status":"INACTIVE","requestedMemOffHeap":0.0,"tasksTotal":28,"requestedCpu":0.0,"replicationCount":1,"executorsTotal":28,"uptimeSeconds":688,"assignedCpu":0.0,"assignedMemOffHeap":0.0,"requestedTotalMem":0.0}],"schedulerDisplayResource":false}




목록을 확인할 수 있다. 


$ ./bin/storm list

Topology_name        Status     Num_tasks  Num_workers  Uptime_secs

-------------------------------------------------------------------

wordcount            ACTIVE     28         3            370




로그로 확인해보려면 logs 밑에 디렉토리가 있는데, 아까 봤던 ps ef로 봤던 6700~6702가 보인다. 


$ls -al logs/workers-artifacts/wordcount-1-1503398931/670

6700/ 6701/ 6702/




$ ls -al logs/workers-artifacts/wordcount-1-1503398931/6700/

gc.log.0.current    worker.log          worker.log.err      worker.log.metrics  worker.log.out      worker.pid          worker.yaml




정상적으로 작동했는지 하나 살펴본다. 


$ ls -al logs/workers-artifacts/wordcount-1-1503398931/6700/worker.log

-rw-r--r--  1 samuel.kim  staff  48832569  8 22 19:57 logs/workers-artifacts/wordcount-1-1503398931/6700/worker.log

[/usr/local/apache-storm-1.0.1] tail -n 10 logs/workers-artifacts/wordcount-1-1503398931/6700/worker.log

2017-08-22 19:57:46.245 o.a.s.d.executor [INFO] Processing received message FOR 10 TUPLE: source: split:21, stream: default, id: {}, [cow]

2017-08-22 19:57:46.245 o.a.s.d.executor [INFO] BOLT ack TASK: 13 TIME:  TUPLE: source: split:22, stream: default, id: {}, ["away"]

2017-08-22 19:57:46.245 o.a.s.d.task [INFO] Emitting: count default [cow, 4976]

2017-08-22 19:57:46.245 o.a.s.d.executor [INFO] Execute done TUPLE source: split:22, stream: default, id: {}, ["away"] TASK: 13 DELTA:

2017-08-22 19:57:46.245 o.a.s.d.executor [INFO] BOLT ack TASK: 10 TIME:  TUPLE: source: split:21, stream: default, id: {}, [cow]

2017-08-22 19:57:46.245 o.a.s.d.executor [INFO] Execute done TUPLE source: split:21, stream: default, id: {}, [cow] TASK: 10 DELTA:

2017-08-22 19:57:46.246 o.a.s.d.executor [INFO] Processing received message FOR 7 TUPLE: source: split:19, stream: default, id: {}, ["am"]

2017-08-22 19:57:46.246 o.a.s.d.task [INFO] Emitting: count default [am, 5050]

2017-08-22 19:57:46.246 o.a.s.d.executor [INFO] BOLT ack TASK: 7 TIME:  TUPLE: source: split:19, stream: default, id: {}, ["am"]

2017-08-22 19:57:46.246 o.a.s.d.executor [INFO] Execute done TUPLE source: split:19, stream: default, id: {}, ["am"] TASK: 7 DELTA:






deactivate 할 수도 있다. 


$ ./bin/storm deactivate wordcount

1650 [main] INFO  o.a.s.c.deactivate - Deactivated topology: wordcount




list로 상태를 본다.


$ ./bin/storm list

Topology_name        Status     Num_tasks  Num_workers  Uptime_secs

-------------------------------------------------------------------

wordcount            INACTIVE   28         3            617








kill해본다.

$./bin/storm kill wordcount

1696 [main] INFO  o.a.s.c.kill-topology - Killed topology: wordcount


ui에서 목록이 사라진다. 


$ curl http://localhost:8080/api/v1/topology/summary

{"topologies":[],"schedulerDisplayResource":false}



웹 브라우저에서 http://localhost:8080/index.html를 열어보면 wordcount topology가 보이지 않거나 wordcount topology가 나타나되 KILLED 상태인 것을 확인할 수도 있다. 

잠깐 wordcount topology가 보여도 결국은 사라진다..





nimbus를 중지해야 worker 데몬은 모두 종료된다. SPOF의 위험성이 높긴 하다.. 


Posted by '김용환'
,



* storm 소개


역시 한글 자료

http://d2.naver.com/helloworld/484148


http://bcho.tistory.com/994 


큰 그림

https://blog.twitter.com/engineering/en_us/a/2011/a-storm-is-coming-more-details-and-plans-for-release.html


(paper) storm internals 부분

http://db.cs.berkeley.edu/cs286/papers/storm-sigmod2014.pdf




* apache storm 커미터인 임정택씨가 쓴 글


https://medium.com/@heartsavior/%EC%96%B4%EC%A9%8C%EB%8B%A4-%EB%82%98%EB%8A%94-open-source-committer-%EA%B0%80-%EB%90%98%EC%97%88%EB%82%98-3-apache-storm-and-more-7baf4d68cd20





* 트위터가 storm 대신 heron을 만들고 있음

좋은 성능과 클라우드 환경(yarn, mesos, aurora)을 지원


Heron is designed with the goal of operating in a cloud environment on top of a scheduling framework such as Aurora or YARN (although it can also run in local mode). As a result, it leverages the resource isolation mechanisms implemented by these frameworks. Storm, on the other hand implements parts of the functionality of the Heron Resource Manager, the Heron Scheduler and the underlying scheduling framework in the same abstraction.


https://www.infoq.com/news/2015/06/twitter-storm-heron 


https://blog.twitter.com/engineering/en_us/a/2015/flying-faster-with-twitter-heron.html


https://twitter.github.io/heron/docs/concepts/architecture/


https://blog.acolyer.org/2017/06/29/twitter-heron-towards-extensible-streaming-engines/


http://dl.acm.org/citation.cfm?id=2742788

Posted by '김용환'
,


각 카산드라 노드에는 특정 토큰 범위가 지정되며 모든 데이터의 부분 집합을 담당한다. 

파티션 키 해시 생성을 담당하는 컴포넌트를 파티셔녀(partitioner)라 한다. 따라서 파티셔너는 주어진 파티션 키의 해시를 계산할 때 사용되며 데이터가 어느 노드에 위치해야 하는지 결정하는 해시 함수이다. 카산드라는 3개의 파티셔너를 제공한다.




1. Murmur3Partitioner : 카산드라 1.2이후의 기본 파티션이다. 해당 파티셔너는 머머(Murmur) 해시 값을 계산하여 클러스터 전체에 데이터를 균일하게 분배한다. 머머 해시 값의 범위는 -263에서 -263-1이다. 해당 파티셔너는 해싱 알고리즘이 빠르고 다른 파티셔너보다 성능이 뛰어나기 때문에 선호된다.



2. RandomPartitioner : 해당 파티셔너는 파티션 키의 MD5 해시 값을 계산해 데이터를 균일하게 분산한다. 해당 파티셔너는 카산드라 초기 버전의 기본 파티셔너였다. 해시 값의 범위는 0에서 2127-1 범위이다.



3. ByteOrderedPartitioner : 해당 파티셔너는 키 바이트 단위로 정렬된 데이터 분포를 어휘적으로 유지한다. 해당 파티셔너는 카산드라의 최신 버전(2.x)에서 사용되지 않고 앞으로 사라질 예정이다. 하지만 예전부터 사용해왔기 때문에 삭제는 되지 않은 상태이다. 해당 파티셔너는 일반적으로 핫스팟(hotspot)과 고르지 않은 로드 밸런싱을 유발시킬 수 있어서 사용하지 않는 것이 좋다.

Posted by '김용환'
,



https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html



Thread pool typesedit

The following are the types of thread pools and their respective parameters:

fixededit

The fixed thread pool holds a fixed size of threads to handle the requests with a queue (optionally bounded) for pending requests that have no threads to service them.

The size parameter controls the number of threads, and defaults to the number of cores times 5.

The queue_size allows to control the size of the queue of pending requests that have no threads to execute them. By default, it is set to -1 which means its unbounded. When a request comes in and the queue is full, it will abort the request.

thread_pool:
    index:
        size: 30
        queue_size: 1000

scalingedit

The scaling thread pool holds a dynamic number of threads. This number is proportional to the workload and varies between the value of the core and max parameters.

The keep_alive parameter determines how long a thread should be kept around in the thread pool without it doing any work.

thread_pool:
    warmer:
        core: 1
        max: 8
        keep_alive: 2m


Posted by '김용환'
,