spark에서 groupByKey를 사용할 때 성능에 많이 떨어질 수 있다. 


좋은 설명을 포함한 링크가 있다.


https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html



groupByKey를 살펴보면, 키 값으로 분류를 하고 모든 계산을 하나씩 진행한다. 따라서 모든 데이터 복사가 많이 일어날 수 있다. 



반면 reduceByKey에서는 계산을 진행할때 데이터 셔플 전에 노드 내에서 조금 계산해놓는다. 따라서 불필요한 데이터가 전달되지 않기 때문에 네트웍 트래픽, 복사 비용이 groupByKey보다 줄어든다.





마치 map/reduce의 custom combiner와 비슷한 느낌으로 동작한다.




http://www.admin-magazine.com/HPC/Articles/MapReduce-and-Hadoop




Posted by '김용환'
,



RDD map을 사용 하기전에 특정 라인(예, 첫번째 라인)을 사용하고 싶지 않다면 다음과 같은 mapPartitionWithIndex()를 사용한다. 


rdd.mapPartitionsWithIndex(

(i, iterator) => if (i == 0) iterator.drop(1) else iterator)

예제는 다음과 같다.




scala> val rdd = sc.parallelize(List("samuel", "kyle", "jun", "ethan", "crizin"), 5)

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> rdd.mapPartitionsWithIndex((i, iterator) => if (i == 0) iterator.drop(1) else iterator).foreach(println)

kyle

crizin

ethan

jun


scala> rdd.mapPartitionsWithIndex((i, iterator) => if (i % 2 == 0) iterator.drop(1) else iterator).foreach(println)

kyle

ethan



Posted by '김용환'
,


cassandra 핵심 내용중 batch log에 대한 내용이 아래 url에 적혀 있다. 


https://www.datastax.com/dev/blog/atomic-batches-in-cassandra-1-2



batchlog 테이블은 node local이다.. 


The batchlog table is node-local, along with the rest of the system keyspace.


노드 로컬(node-local) : 배치가 실행되는 노드에서 배치 로그가 저장된다는 것을 의미한다.




Posted by '김용환'
,

[scala] Product 이해하기

scala 2017. 8. 10. 16:20

case class는 정말 scala/spark 코딩할 때 없으면 안되는 괜찮은 클래스이다.

정체를 알게 되면서 Product를 알게 되 었는 데 정리차 글을 정리한다. 




scala 컴파일러는 타입을 추론하는데..


아래 예제를 보면 Animal이라는 트레이트를 믹싱해서 사용하고 있는데, 

최종 결과를 보면 Product with Serializble with Animal이다. 



scala> trait Animal

defined trait Animal


scala> trait FurryAnimal extends Animal

defined trait FurryAnimal


scala> case class Dog(name:String) extends Animal

defined class Dog


scala> case class Cat(name:String) extends Animal

defined class Cat


scala> val x = Array(Dog("Fido"),Cat("Felix"))

x: Array[Product with Serializable with Animal] = Array(Dog(Fido), Cat(Felix))





만약 Animal 트레이트에 직접 Product를 상속하고 Serialable을 믹스인했다면.. 스칼라 컴파일러는 명확하게 Animal타입으로 인식한다. 


scala> trait Animal extends Product with Serializable

defined trait Animal


scala> case class Dog(name: String) extends Animal

defined class Dog


scala> case class Cat(name: String) extends Animal

defined class Cat


scala> Array(Dog("d"), Cat("c"))

res0: Array[Animal] = Array(Dog(d), Cat(c))




https://stackoverflow.com/a/36526557의 핵심 내용을 정리해본다. 


스칼라의 case class는 다음 특징을 갖고 있다. 


1. Product를 자동으로 상속한다.

2. Serializable을 상속한다

3. 패턴 매치에 쓰이기 위해 hashCode와 equals를 상속한다.

4. 타입 분해를 위해 apply와 unapply 메소드를 지원한다. 



case class는 ADT(Product)의 표현하는 방식이다. 

(그래서 분해 되고 패턴매칭 쓰고 Serializable되니 많이 사용될 수 밖에 없다)



참고로 case object 뿐 아니라 case object도 동일한 Product를 상속받은 스칼라 추론이 발생한다. 



scala> trait Animal

defined trait Animal


scala> case object Dog extends Animal

defined object Dog


scala> case object Cat extends Animal

defined object Cat


scala> val animals = List(Dog, Cat)

animals: List[Product with Serializable with Animal] = List(Dog, Cat)




현상은 case case와 동일하다. 


scala> trait Animal extends Product with Serializable

defined trait Animal


scala>  case object Dog extends Animal

defined object Dog


scala> case object Cat extends Animal

defined object Cat


scala>  val animals = List(Dog, Cat)

animals: List[Animal] = List(Dog, Cat)



Product 내부 코드를 살펴보면 다음과 같다.scala.Equals를 믹스인하고 있다.


package scala
trait Product extends scala.Any with scala.Equals {
def productElement(n : scala.Int) : scala.Any
def productArity : scala.Int
def productIterator : scala.Iterator[scala.Any] = { /* compiled code */ }
def productPrefix : java.lang.String = { /* compiled code */ }
}

scala.Equals는 다음과 같다. Product에는 equals를 갖고 있다. 

package scala
trait Equals extends scala.Any {
def canEqual(that : scala.Any) : scala.Boolean
def equals(that : scala.Any) : scala.Boolean
}


Product의 특징을 살펴본다. productArity, productElement, productPrefix, productIterator 메소드를 확인할 수 있다. 


scala> case class Member(id: Integer, lastname: String, firstName: String)

defined class Member


scala> val samuel = Member(1, "YongHwan", "Kim")

samuel: Member = Member(1,YongHwan,Kim)


scala> samuel.productArity

res0: Int = 3


scala> samuel.productElement(0)

res1: Any = 1


scala> samuel.productElement(1)

res2: Any = YongHwan


scala> samuel.productElement(2)

res3: Any = Kim


scala> samuel.productPrefix

res5: String = Member


scala> samuel.productIterator.foreach(println)

1

YongHwan

Kim




마지막 라인을 기억해두며..List와 Map도 모두 Product를 상속받았음을 알 수 있다. 

scala> val a = List("a", "b")
a: List[String] = List(a, b)

scala> a.productIterator.foreach(println)
a
List(b)

scala> val map = 1 -> "a"
map: (Int, String) = (1,a)

scala> map.productIterator.foreach(println)
1
a


좀 더 고급으로가면.. 스칼라 Reflection 코드도 만나게 된다. 결국 Product는 나름 상위 클래스로 사용되고 있음을 알려주는 코드라 할 수 있다. 참고로(ClassTag와 TypeTag는 Reflection 클래스이다.)

https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala



 protected def withParquetFile[T <: Product: ClassTag: TypeTag]

      (data: Seq[T])

      (f: String => Unit): Unit = {

    withTempPath { file =>

      spark.createDataFrame(data).write.parquet(file.getCanonicalPath)

      f(file.getCanonicalPath)

    }

  }


Posted by '김용환'
,


카산드라에서 UUID를 생성하는 방법은 (좀 적은 개수의 로우를 가진) 아무 테이블에서 LIMIT 1로 select now()를 호출한다. 



cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 08376da0-7c8e-11e7-95fc-eb95772c8baf


(1 rows)

cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 0e2d4ea0-7c8e-11e7-95fc-eb95772c8baf


(1 rows)

cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 0eee0690-7c8e-11e7-95fc-eb95772c8baf


(1 rows)

cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 0fa63300-7c8e-11e7-95fc-eb95772c8baf


(1 rows)

cqlsh:my_status> SELECT NOW() FROM "user_status_updates" LIMIT 1;


 system.now()

--------------------------------------

 1053b110-7c8e-11e7-95fc-eb95772c8baf





만약 테이블에 now()로 저장된 UUID 필드를 갖고 있다면 다양한 포맷으로 볼 수 있다.

SELECT id, toDate(id), unixtimestampof(id), toTimestamp(id)  FROM "user_status_updates"  limit 1;

 id                                   | system.todate(id) | system.unixtimestampof(id) | system.totimestamp(id)
--------------------------------------+-------------------+----------------------------+---------------------------------
 97719c50-e797-11e3-90ce-5f98e903bf02 |        2014-05-30 |              1401412401813 | 2014-05-30 01:13:21.813000+0000




자세한 내용은 다음을 참조한다.
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html





Posted by '김용환'
,

전체 keyspace를 덤프뜨려면 다음과 같이 진행한다.



 



$ ./bin/cqlsh -e "desc schema"


CREATE KEYSPACE users WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;


CREATE TABLE users. follow_relation (

...

}



파일로 저장하려면 다음과 같이 진행한다.


$ ./bin/cqlsh -e "desc schema" > schema.cql




특정 keyspace만 파일로 저장하려면 다음과 같이 진행한다.



$ ./bin/cqlsh -e "desc keyspace my_status" > my_status.cql

$ cat schema.cql

CREATE KEYSPACE my_status WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;


CREATE TABLE my_status.follow_relation (

    followed_username text,

    follower_username text,

....

}




생성된 keyspace 파일을 import하는 방법은 cqlsh에 들어가서 source 명령을 사용하면 된다. 


$./bin/cqlsh

Connected to Test Cluster at 127.0.0.1:9042.

[cqlsh 5.0.1 | Cassandra 3.10 | CQL spec 3.4.4 | Native protocol v4]

Use HELP for help.

cqlsh> source 'schema.cql'

cqlsh> use my_status;

cqlsh:my_status> describe my_status;

Posted by '김용환'
,


spark 코딩을 할 때 깊이 생각안하고 대충 짠 것을 후회했다. 그냥 동작만 되길 바라면서 했던 것들이 많이 기억났다. 




spark의 coursera 강의 중 wide dependency와 narrow dependency에 대한 설명이 나오는데, 많은 영감을 주어서 잘 펌질해본다.





https://github.com/rohitvg/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies





Transformations with (usually) Narrow dependencies:

  • map
  • mapValues
  • flatMap
  • filter
  • mapPartitions
  • mapPartitionsWithIndex

Transformations with (usually) Wide dependencies: (might cause a shuffle)

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce










Posted by '김용환'
,


카산드라(cassandra)에서 IN과 ORDER BY를 함께 싸용하면 다음과 같은 에러가 발생할 수 있다. 

(참고로 ORDER BY 다음에는 클러스터링 키를 사용함으로서, 원하는 대로 파티션 키와 상관없이 생성 시간을 내림차순으로 결과를 얻을 수 있다)


InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot page queries with both ORDER BY and a IN restriction on the partition key; you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"



이 때에는 PAGING OFF라는 커맨드를 사용하면 에러가 발생하지 않고 정상적으로 동작한다.



Posted by '김용환'
,


RDD에 partitonBy 메소드를 호출하면서 Partitioner를 정할 수 있다. 

기본 Partitioner(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/Partitioner.html)로는 HashPartitioner, RangePartitioner가 존재한다. 



우선 HashPartitioner를 사용한다. 파티셔닝을 해쉬로 퍼트릴 수 있기 때문에 유용하다. 



먼저 5개의 파티션으로 RDD를 생성했다가 Partitioning을 3개의 HashPartitioner를 사용하는 예제이다. 




scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)), 5)

pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24


scala> pairs.partitioner

res1: Option[org.apache.spark.Partitioner] = None


scala> import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner


scala> val partitioned = pairs.partitionBy(new HashPartitioner(3)).persist()

partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at <console>:27


scala> partitioned.collect

res2: Array[(Int, Int)] = Array((2,2), (1,1), (3,3))


scala> pairs.partitions.length

res7: Int = 5


scala> partitioned.partitions.length

res8: Int = 3


scala> pairs.partitions

res5: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@6ba, org.apache.spark.rdd.ParallelCollectionPartition@6bb, org.apache.spark.rdd.ParallelCollectionPartition@6bc, org.apache.spark.rdd.ParallelCollectionPartition@6bd, org.apache.spark.rdd.ParallelCollectionPartition@6be)


scala> partitioned.partitions

res6: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ShuffledRDDPartition@0, org.apache.spark.rdd.ShuffledRDDPartition@1, org.apache.spark.rdd.ShuffledRDDPartition@2)



persist()는 shuffle을 이미 되도록 해놓기 때문에 성능상 이점을 가진다. 실무에서 사용할 때 유용한 팁이다. 



참고로 RDD.toDebugString() 메소드가 존재하는데 shuffle RDD인지 아닌지를 파악할 때 도움이 된다. 



scala> partitioned.toDebugString

res11: String =

(3) ShuffledRDD[8] at partitionBy at <console>:27 [Memory Deserialized 1x Replicated]

 |       CachedPartitions: 3; MemorySize: 192.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B

 +-(5) ParallelCollectionRDD[7] at parallelize at <console>:24 [Memory Deserialized 1x Replicated]


scala> pairs.toDebugString

res13: String = (5) ParallelCollectionRDD[7] at parallelize at <console>:24 []






다음은 RangePartitioner 예제이다. 내용은 비슷해보인다.



scala> import org.apache.spark.RangePartitioner

import org.apache.spark.RangePartitioner


scala> new RangePartitioner(3, pairs)

res9: org.apache.spark.RangePartitioner[Int,Int] = org.apache.spark.RangePartitioner@7d2d


scala> val rangePartitioned = pairs.partitionBy(new RangePartitioner(3, pairs)).persist()

rangePartitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[8] at partitionBy at <console>:28


scala> rangePartitioned.collect

res10: Array[(Int, Int)] = Array((1,1), (2,2), (3,3))


scala> rangePartitioned.partitions.length

res11: Int = 3



RangePartitioner API(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/RangePartitioner.html)를 살펴보면, ordering와 정렬순서(오름차순/내림차순)으로 할 수 있는 형태가 있다. HashPartitioner와 크게 다른 내용이라 할 수 있을 듯 싶다. 

소스 : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala

public RangePartitioner(int partitions,
                RDD<? extends scala.Product2<K,V>> rdd,
                boolean ascending,
                scala.math.Ordering<K> evidence$1,
                scala.reflect.ClassTag<K> evidence$2)





'scala' 카테고리의 다른 글

[scala] Product 이해하기  (0) 2017.08.10
[spark] [펌질] wide dependecy, narrow dependency  (0) 2017.08.08
[spark2] cache()와 persist()의 차이  (0) 2017.08.01
[scala] scalatest에서 Exception 처리  (0) 2017.07.27
[scala] scalablitz  (0) 2017.07.27
Posted by '김용환'
,

일래스틱서치에 필드 캐시의 expire를 설정하는 옵션(indices.fielddata.cache.expire )이 1.x 버전에 있었지만 2.0부터는 사라졌다.



https://www.elastic.co/guide/en/elasticsearch/reference/1.4/index-modules-fielddata.html



indices.fielddata.cache.expire

[experimentalThis functionality is experimental and may be changed or removed completely in a future release. Elastic will take a best effort approach to fix any issues, but experimental features are not subject to the support SLA of official GA features.A time based setting that expires field data after a certain time of inactivity. Defaults to -1. For example, can be set to 5m for a 5 minute expiry.


이 기능이 gc를 많이 유발하고 crash를 일으키는 이슈가 있어서 사라진 듯 하다..

 https://discuss.elastic.co/t/indices-fielddata-cache-expire/1183



1.4에서는 잘 사용해서 문제가 없었지만. 결국 사라진 것으로 봐서는 큰 gc 이슈를 일으킨 것으로 보인다..

어차피 2.0에서 사라졌으니.. 히스토리를 위해서 남겨둔다.




Posted by '김용환'
,