[scala] Product 이해하기

scala 2017. 8. 10. 16:20

case class는 정말 scala/spark 코딩할 때 없으면 안되는 괜찮은 클래스이다.

정체를 알게 되면서 Product를 알게 되 었는 데 정리차 글을 정리한다. 




scala 컴파일러는 타입을 추론하는데..


아래 예제를 보면 Animal이라는 트레이트를 믹싱해서 사용하고 있는데, 

최종 결과를 보면 Product with Serializble with Animal이다. 



scala> trait Animal

defined trait Animal


scala> trait FurryAnimal extends Animal

defined trait FurryAnimal


scala> case class Dog(name:String) extends Animal

defined class Dog


scala> case class Cat(name:String) extends Animal

defined class Cat


scala> val x = Array(Dog("Fido"),Cat("Felix"))

x: Array[Product with Serializable with Animal] = Array(Dog(Fido), Cat(Felix))





만약 Animal 트레이트에 직접 Product를 상속하고 Serialable을 믹스인했다면.. 스칼라 컴파일러는 명확하게 Animal타입으로 인식한다. 


scala> trait Animal extends Product with Serializable

defined trait Animal


scala> case class Dog(name: String) extends Animal

defined class Dog


scala> case class Cat(name: String) extends Animal

defined class Cat


scala> Array(Dog("d"), Cat("c"))

res0: Array[Animal] = Array(Dog(d), Cat(c))




https://stackoverflow.com/a/36526557의 핵심 내용을 정리해본다. 


스칼라의 case class는 다음 특징을 갖고 있다. 


1. Product를 자동으로 상속한다.

2. Serializable을 상속한다

3. 패턴 매치에 쓰이기 위해 hashCode와 equals를 상속한다.

4. 타입 분해를 위해 apply와 unapply 메소드를 지원한다. 



case class는 ADT(Product)의 표현하는 방식이다. 

(그래서 분해 되고 패턴매칭 쓰고 Serializable되니 많이 사용될 수 밖에 없다)



참고로 case object 뿐 아니라 case object도 동일한 Product를 상속받은 스칼라 추론이 발생한다. 



scala> trait Animal

defined trait Animal


scala> case object Dog extends Animal

defined object Dog


scala> case object Cat extends Animal

defined object Cat


scala> val animals = List(Dog, Cat)

animals: List[Product with Serializable with Animal] = List(Dog, Cat)




현상은 case case와 동일하다. 


scala> trait Animal extends Product with Serializable

defined trait Animal


scala>  case object Dog extends Animal

defined object Dog


scala> case object Cat extends Animal

defined object Cat


scala>  val animals = List(Dog, Cat)

animals: List[Animal] = List(Dog, Cat)



Product 내부 코드를 살펴보면 다음과 같다.scala.Equals를 믹스인하고 있다.


package scala
trait Product extends scala.Any with scala.Equals {
def productElement(n : scala.Int) : scala.Any
def productArity : scala.Int
def productIterator : scala.Iterator[scala.Any] = { /* compiled code */ }
def productPrefix : java.lang.String = { /* compiled code */ }
}

scala.Equals는 다음과 같다. Product에는 equals를 갖고 있다. 

package scala
trait Equals extends scala.Any {
def canEqual(that : scala.Any) : scala.Boolean
def equals(that : scala.Any) : scala.Boolean
}


Product의 특징을 살펴본다. productArity, productElement, productPrefix, productIterator 메소드를 확인할 수 있다. 


scala> case class Member(id: Integer, lastname: String, firstName: String)

defined class Member


scala> val samuel = Member(1, "YongHwan", "Kim")

samuel: Member = Member(1,YongHwan,Kim)


scala> samuel.productArity

res0: Int = 3


scala> samuel.productElement(0)

res1: Any = 1


scala> samuel.productElement(1)

res2: Any = YongHwan


scala> samuel.productElement(2)

res3: Any = Kim


scala> samuel.productPrefix

res5: String = Member


scala> samuel.productIterator.foreach(println)

1

YongHwan

Kim




마지막 라인을 기억해두며..List와 Map도 모두 Product를 상속받았음을 알 수 있다. 

scala> val a = List("a", "b")
a: List[String] = List(a, b)

scala> a.productIterator.foreach(println)
a
List(b)

scala> val map = 1 -> "a"
map: (Int, String) = (1,a)

scala> map.productIterator.foreach(println)
1
a


좀 더 고급으로가면.. 스칼라 Reflection 코드도 만나게 된다. 결국 Product는 나름 상위 클래스로 사용되고 있음을 알려주는 코드라 할 수 있다. 참고로(ClassTag와 TypeTag는 Reflection 클래스이다.)

https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala



 protected def withParquetFile[T <: Product: ClassTag: TypeTag]

      (data: Seq[T])

      (f: String => Unit): Unit = {

    withTempPath { file =>

      spark.createDataFrame(data).write.parquet(file.getCanonicalPath)

      f(file.getCanonicalPath)

    }

  }


Posted by '김용환'
,


카산드라에서 UUID를 생성하는 방법은 (좀 적은 개수의 로우를 가진) 아무 테이블에서 LIMIT 1로 select now()를 호출한다. 



cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 08376da0-7c8e-11e7-95fc-eb95772c8baf


(1 rows)

cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 0e2d4ea0-7c8e-11e7-95fc-eb95772c8baf


(1 rows)

cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 0eee0690-7c8e-11e7-95fc-eb95772c8baf


(1 rows)

cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 0fa63300-7c8e-11e7-95fc-eb95772c8baf


(1 rows)

cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 1053b110-7c8e-11e7-95fc-eb95772c8baf





만약 테이블에 now()로 저장된 UUID 필드를 갖고 있다면 다양한 포맷으로 볼 수 있다.

SELECT id, toDate(id), unixtimestampof(id), toTimestamp(id)  FROM "user_status_updates"  limit 1;

 id                                   | system.todate(id) | system.unixtimestampof(id) | system.totimestamp(id)
--------------------------------------+-------------------+----------------------------+---------------------------------
 97719c50-e797-11e3-90ce-5f98e903bf02 |        2014-05-30 |              1401412401813 | 2014-05-30 01:13:21.813000+0000




자세한 내용은 다음을 참조한다.
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html





Posted by '김용환'
,

전체 keyspace를 덤프뜨려면 다음과 같이 진행한다.



 



$ ./bin/cqlsh -e "desc schema"


CREATE KEYSPACE users WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;


CREATE TABLE users. follow_relation (

...

}



파일로 저장하려면 다음과 같이 진행한다.


$ ./bin/cqlsh -e "desc schema" > schema.cql




특정 keyspace만 파일로 저장하려면 다음과 같이 진행한다.



$ ./bin/cqlsh -e "desc keyspace my_status" > my_status.cql

$ cat schema.cql

CREATE KEYSPACE my_status WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;


CREATE TABLE my_status.follow_relation (

    followed_username text,

    follower_username text,

....

}




생성된 keyspace 파일을 import하는 방법은 cqlsh에 들어가서 source 명령을 사용하면 된다. 


$./bin/cqlsh

Connected to Test Cluster at 127.0.0.1:9042.

[cqlsh 5.0.1 | Cassandra 3.10 | CQL spec 3.4.4 | Native protocol v4]

Use HELP for help.

cqlsh> source 'schema.cql'

cqlsh> use my_status;

cqlsh:my_status> describe my_status;

Posted by '김용환'
,


spark 코딩을 할 때 깊이 생각안하고 대충 짠 것을 후회했다. 그냥 동작만 되길 바라면서 했던 것들이 많이 기억났다. 




spark의 coursera 강의 중 wide dependency와 narrow dependency에 대한 설명이 나오는데, 많은 영감을 주어서 잘 펌질해본다.





https://github.com/rohitvg/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies





Transformations with (usually) Narrow dependencies:

  • map
  • mapValues
  • flatMap
  • filter
  • mapPartitions
  • mapPartitionsWithIndex

Transformations with (usually) Wide dependencies: (might cause a shuffle)

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce










Posted by '김용환'
,


카산드라(cassandra)에서 IN과 ORDER BY를 함께 싸용하면 다음과 같은 에러가 발생할 수 있다. 

(참고로 ORDER BY 다음에는 클러스터링 키를 사용함으로서, 원하는 대로 파티션 키와 상관없이 생성 시간을 내림차순으로 결과를 얻을 수 있다)


InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot page queries with both ORDER BY and a IN restriction on the partition key; you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"



이 때에는 PAGING OFF라는 커맨드를 사용하면 에러가 발생하지 않고 정상적으로 동작한다.



Posted by '김용환'
,


RDD에 partitonBy 메소드를 호출하면서 Partitioner를 정할 수 있다. 

기본 Partitioner(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/Partitioner.html)로는 HashPartitioner, RangePartitioner가 존재한다. 



우선 HashPartitioner를 사용한다. 파티셔닝을 해쉬로 퍼트릴 수 있기 때문에 유용하다. 



먼저 5개의 파티션으로 RDD를 생성했다가 Partitioning을 3개의 HashPartitioner를 사용하는 예제이다. 




scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)), 5)

pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24


scala> pairs.partitioner

res1: Option[org.apache.spark.Partitioner] = None


scala> import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner


scala> val partitioned = pairs.partitionBy(new HashPartitioner(3)).persist()

partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at <console>:27


scala> partitioned.collect

res2: Array[(Int, Int)] = Array((2,2), (1,1), (3,3))


scala> pairs.partitions.length

res7: Int = 5


scala> partitioned.partitions.length

res8: Int = 3


scala> pairs.partitions

res5: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@6ba, org.apache.spark.rdd.ParallelCollectionPartition@6bb, org.apache.spark.rdd.ParallelCollectionPartition@6bc, org.apache.spark.rdd.ParallelCollectionPartition@6bd, org.apache.spark.rdd.ParallelCollectionPartition@6be)


scala> partitioned.partitions

res6: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ShuffledRDDPartition@0, org.apache.spark.rdd.ShuffledRDDPartition@1, org.apache.spark.rdd.ShuffledRDDPartition@2)



persist()는 shuffle을 이미 되도록 해놓기 때문에 성능상 이점을 가진다. 실무에서 사용할 때 유용한 팁이다. 



참고로 RDD.toDebugString() 메소드가 존재하는데 shuffle RDD인지 아닌지를 파악할 때 도움이 된다. 



scala> partitioned.toDebugString

res11: String =

(3) ShuffledRDD[8] at partitionBy at <console>:27 [Memory Deserialized 1x Replicated]

 |       CachedPartitions: 3; MemorySize: 192.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B

 +-(5) ParallelCollectionRDD[7] at parallelize at <console>:24 [Memory Deserialized 1x Replicated]


scala> pairs.toDebugString

res13: String = (5) ParallelCollectionRDD[7] at parallelize at <console>:24 []






다음은 RangePartitioner 예제이다. 내용은 비슷해보인다.



scala> import org.apache.spark.RangePartitioner

import org.apache.spark.RangePartitioner


scala> new RangePartitioner(3, pairs)

res9: org.apache.spark.RangePartitioner[Int,Int] = org.apache.spark.RangePartitioner@7d2d


scala> val rangePartitioned = pairs.partitionBy(new RangePartitioner(3, pairs)).persist()

rangePartitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[8] at partitionBy at <console>:28


scala> rangePartitioned.collect

res10: Array[(Int, Int)] = Array((1,1), (2,2), (3,3))


scala> rangePartitioned.partitions.length

res11: Int = 3



RangePartitioner API(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/RangePartitioner.html)를 살펴보면, ordering와 정렬순서(오름차순/내림차순)으로 할 수 있는 형태가 있다. HashPartitioner와 크게 다른 내용이라 할 수 있을 듯 싶다. 

소스 : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala

public RangePartitioner(int partitions,
                RDD<? extends scala.Product2<K,V>> rdd,
                boolean ascending,
                scala.math.Ordering<K> evidence$1,
                scala.reflect.ClassTag<K> evidence$2)





'scala' 카테고리의 다른 글

[scala] Product 이해하기  (0) 2017.08.10
[spark] [펌질] wide dependecy, narrow dependency  (0) 2017.08.08
[spark2] cache()와 persist()의 차이  (0) 2017.08.01
[scala] scalatest에서 Exception 처리  (0) 2017.07.27
[scala] scalablitz  (0) 2017.07.27
Posted by '김용환'
,

일래스틱서치에 필드 캐시의 expire를 설정하는 옵션(indices.fielddata.cache.expire )이 1.x 버전에 있었지만 2.0부터는 사라졌다.



https://www.elastic.co/guide/en/elasticsearch/reference/1.4/index-modules-fielddata.html



indices.fielddata.cache.expire

[experimentalThis functionality is experimental and may be changed or removed completely in a future release. Elastic will take a best effort approach to fix any issues, but experimental features are not subject to the support SLA of official GA features.A time based setting that expires field data after a certain time of inactivity. Defaults to -1. For example, can be set to 5m for a 5 minute expiry.


이 기능이 gc를 많이 유발하고 crash를 일으키는 이슈가 있어서 사라진 듯 하다..

 https://discuss.elastic.co/t/indices-fielddata-cache-expire/1183



1.4에서는 잘 사용해서 문제가 없었지만. 결국 사라진 것으로 봐서는 큰 gc 이슈를 일으킨 것으로 보인다..

어차피 2.0에서 사라졌으니.. 히스토리를 위해서 남겨둔다.




Posted by '김용환'
,




elasticsearch 1.x의 메모리 구조이다.  정말 잘 설명되어 있는 이미지가 있어서 펌한다.



https://kupczynski.info/2015/04/06/fielddata.html



좀 더 크게 보면 다음과 같다.




Posted by '김용환'
,

Spark에서는 연산할 때 스토리 레벨에 따라 지원하는 api, cache()와 persist()가 존재한다. 



RDD에 cache를 저장한 예제를 살펴본다.

scala> val c = sc.parallelize(List("samuel"), 2)

c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> c.getStorageLevel

res0: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)


scala> c.cache

res1: c.type = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> c.getStorageLevel

res2: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)




cache()는 기본 저장소 레벨이 MEMORY_ONLY로만으로 사용된다. 





이번에는 persist() 예제를 진행한다. persist()는 다음 스토리지 레벨에 맞게 사용할 수 있다. 

여기에서 SER은 serialized을 의미한다. disk 저장 위치는 로컬이다. 


* 크게 분류된 스토리지 레벨(Storage Level)


Level

Space used

cpu time 

In memory 

 On disk

Serialized 

MEMORY_ONLY 

High

Low 

 N

MEMORY_ONLY_SER

Low 

High 

 N Y

MEMORY_AND_DISK

High

Medium 

Some 

 Some Some

MEMORY_AND_DISK_SER 

Low 

High 

Some 

 Some Y

DISK_ONLY 

Low 

High 


 Y Y



scala> import org.apache.spark.storage.StorageLevel;

import org.apache.spark.storage.StorageLevel


// 기존 c를 활용.


scala> c.persist(StorageLevel.MEMORY_ONLY_SER)

res4: c.type = ParallelCollectionRDD[1] at parallelize at <console>:24


scala> c.getStorageLevel

res5: org.apache.spark.storage.StorageLevel = StorageLevel(memory, 1 replicas)





scala> val c = sc.parallelize(List("samuel"), 2)

c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:25


scala> c.persist(StorageLevel.MEMORY_AND_DISK)

res7: c.type = ParallelCollectionRDD[2] at parallelize at <console>:25


scala> c.getStorageLevel

res8: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)



참고로 rdd에 persist를 사용하고 다시 persist를 사용하면 에러가 발생한다.

scala> c.persist(StorageLevel.MEMORY_AND_DISK)
java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
  at org.apache.spark.rdd.RDD.persist(RDD.scala:169)
  at org.apache.spark.rdd.RDD.persist(RDD.scala:194)
  ... 48 elided




스토리지 레벨에 대한 공식 문서 내용은 다음과 같다. 

https://spark.apache.org/docs/latest/rdd-programming-guide.html


Storage Level         | Meaning

--------------------------------

MEMORY_ONLY | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

MEMORY_AND_DISK  |  Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

MEMORY_ONLY_SER  |  Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

MEMORY_AND_DISK_SER | Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

DISK_ONLY         | Store the RDD partitions only on disk.

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.  | Same as the levels above, but replicate each partition on two cluster nodes.

OFF_HEAP (experimental)  | Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.





원본 데이터 저장과 serialized의 성능 차이는 어느 개발자가 써놓은 내용이 있다..

출처 : http://sujee.net/wp-content/uploads/2015/01/spark-caching-1.png





또한, persist를 잘못 사용하면 애플리케이션이 종료해도 메모리를 계속 사용하는 문제가 발생할 수 있다고 한다.

출처 : http://tomining.tistory.com/84




따라서 애플리케이션 내에서 persist()호출이 된 rdd에 unpersist()를 호출해야 한다.!

scala> c.unpersist()
res10: c.type = ParallelCollectionRDD[2] at parallelize at <console>:25



Posted by '김용환'
,




핫 스레드 API는 여러 정보를 포함한 형태를 가진 텍스트로 리턴한다. 즉 JSON 구조로 리턴하지 않는 형태를 갖고 있다. 


응답 구조 자체에 대해 설명하기 전에 핫 스레드 API의 응답을 생성하는 로직을 짧게 소개한다.


일래스틱서치는 먼저 실행 중인 모든 스레드를 얻은 후 각 스레드에서 소비한 CPU 시간, 특정 스레드가 차단되었거나 대기 상태에 있었던 횟수, 차단된 시간 또는 대기 상태에 있었던 시간 등에 대한 다양한 정보를 수집한다. 


다음에는 특정 시간(interval 매개 변수로 지정) 동안 기다린 후 시간이 지나면 동일한 정보를 다시 수집한다.


이 작업이 완료되면 각 특정 스레드가 실행되고 있는 시간에 따라 스레드가 정렬된다. 가장 오랜 기간 실행 중인 스레드가 목록 맨 위에 오도록 내림차순으로 정렬된다.



(이전에 언급된 시간은 type 매개 변수에 지정된 오퍼레이션 타입을 기반으로 측정된다. )


그 다음 일래스틱서치는 첫 번째 N개의 스레드(N은 threads 매개 변수로 지정된 스레드 개수)를 분석한다. 


일래스틱서치는 몇 밀리 초마다 이전 단계에서 선택한 스레드의 스택 트레이스(stack trace)의 일부 스냅샷(스냅 샷 수는 스냅 샷 매개 변수로 지정)을 사용한다. 


마지막으로 해야 할 일은 스레드 상태의 변경을 시각화하고, 호출 함수에게 응답을 리턴하기 위해 스택 트레이스를 그룹핑하는 것이다.




threads 개수는 기본 3개이고 간격은 500ms이며 type의 기본 값은 cpu이다. 

간단한 예제를 보면 다음과 같다.

$ curl 'localhost:9200/_nodes/hot_threads?type=wait&interval=1s'
::: {5OEGj_a}{5OEGj_avT8un0nOak28qQg}{DAzM0ktKQNS047ggd9nYZQ}{127.0.0.1}{127.0.0.1:9300}
   Hot threads at 2017-07-31T11:04:59.943Z, interval=1s, busiestThreads=3, ignoreIdleThreads=true:

    8.4% (35.1ms out of 1000ms) cpu usage by thread 'elasticsearch[kemi][search][T#2]'
     10/10 snapshots sharing following 8 elements
       sun.misc.Unsafe.park(Native Method)
       java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
       java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
       java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
       java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
       java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
       org.elasticsearch.bootstrap.Bootstrap$1.run(Bootstrap.java:84)
       java.lang.Thread.run(Thread.java:745)

....



결과의 첫 부분을 보면..


핫 스레드 API 정보를 리턴하는 노드가 어느 노드인지 쉽게 알 수 있고 핫 스레드 API 호출이 언제 많은 노드로 전달되는 시점을 알 수 있다.





두 번째 부분은 


8.4% (35.1ms out of 1000ms) cpu usage by thread 'elasticsearch[kemi][search][T#2]'


해당 스레드는 측정이 완료된 시점의 모든 CPU 시간 중 8.4%를 차지함을 알 수 있다. 

cpu usage 부분은 cpu와 동일한 type을 사용하고 있음을 나타낸다 (여기에서 예상할 수 있는 다른 값은 블럭(block) 상태에 있는 스레드의 블럭 사용량(block usage)와 대기 상태에 있는 스레드의 대기 사용량(wait usage)이다).  스레드 이름은 여기에서 매우 중요하다. 

스레드를 살펴보면 해당 일래스틱서치 스레드가 가장 핫한 스레드임을 알 수 있다. 이 예제의 핫 스레드가 모두 검색(search 값)이라는 것을 알 수 있다. 

볼 수 있는 다른 값으로는 recovery_stream(복구 모듈 이벤트), cache(이벤트 캐시), merge(세그먼트 병합), index(데이터 저장 스레드) 등이 있다.



관련 내용은 다음 코드를 확인한다.


https://github.com/elastic/elasticsearch/blob/v5.2.1/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java



public class NodesHotThreadsRequest extends BaseNodesRequest<NodesHotThreadsRequest> {


    int threads = 3;

    String type = "cpu";

    TimeValue interval = new TimeValue(500, TimeUnit.MILLISECONDS);

    int snapshots = 10;

    boolean ignoreIdleThreads = true;


    // for serialization

    public NodesHotThreadsRequest() {


    }


    /**

     * Get hot threads from nodes based on the nodes ids specified. If none are passed, hot

     * threads for all nodes is used.

     */

    public NodesHotThreadsRequest(String... nodesIds) {

        super(nodesIds);

    }


    public int threads() {

        return this.threads;

    }


    public NodesHotThreadsRequest threads(int threads) {

        this.threads = threads;

        return this;

    }


    public boolean ignoreIdleThreads() {

        return this.ignoreIdleThreads;

    }


    public NodesHotThreadsRequest ignoreIdleThreads(boolean ignoreIdleThreads) {

        this.ignoreIdleThreads = ignoreIdleThreads;

        return this;

    }


    public NodesHotThreadsRequest type(String type) {

        this.type = type;

        return this;

    }


    public String type() {

        return this.type;

    }


    public NodesHotThreadsRequest interval(TimeValue interval) {

        this.interval = interval;

        return this;

    }


    public TimeValue interval() {

        return this.interval;

    }


    public int snapshots() {

        return this.snapshots;

    }


    public NodesHotThreadsRequest snapshots(int snapshots) {

        this.snapshots = snapshots;

        return this;

    }


    @Override

    public void readFrom(StreamInput in) throws IOException {

        super.readFrom(in);

        threads = in.readInt();

        ignoreIdleThreads = in.readBoolean();

        type = in.readString();

        interval = new TimeValue(in);

        snapshots = in.readInt();

    }


    @Override

    public void writeTo(StreamOutput out) throws IOException {

        super.writeTo(out);

        out.writeInt(threads);

        out.writeBoolean(ignoreIdleThreads);

        out.writeString(type);

        interval.writeTo(out);

        out.writeInt(snapshots);

    }

}


Posted by '김용환'
,