spark에서 groupByKey를 사용할 때 성능에 많이 떨어질 수 있다. 


좋은 설명을 포함한 링크가 있다.


https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html



groupByKey를 살펴보면, 키 값으로 분류를 하고 모든 계산을 하나씩 진행한다. 따라서 모든 데이터 복사가 많이 일어날 수 있다. 



반면 reduceByKey에서는 계산을 진행할때 데이터 셔플 전에 노드 내에서 조금 계산해놓는다. 따라서 불필요한 데이터가 전달되지 않기 때문에 네트웍 트래픽, 복사 비용이 groupByKey보다 줄어든다.





마치 map/reduce의 custom combiner와 비슷한 느낌으로 동작한다.




http://www.admin-magazine.com/HPC/Articles/MapReduce-and-Hadoop




Posted by '김용환'
,



RDD map을 사용 하기전에 특정 라인(예, 첫번째 라인)을 사용하고 싶지 않다면 다음과 같은 mapPartitionWithIndex()를 사용한다. 


rdd.mapPartitionsWithIndex(

(i, iterator) => if (i == 0) iterator.drop(1) else iterator)

예제는 다음과 같다.




scala> val rdd = sc.parallelize(List("samuel", "kyle", "jun", "ethan", "crizin"), 5)

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> rdd.mapPartitionsWithIndex((i, iterator) => if (i == 0) iterator.drop(1) else iterator).foreach(println)

kyle

crizin

ethan

jun


scala> rdd.mapPartitionsWithIndex((i, iterator) => if (i % 2 == 0) iterator.drop(1) else iterator).foreach(println)

kyle

ethan



Posted by '김용환'
,


cassandra 핵심 내용중 batch log에 대한 내용이 아래 url에 적혀 있다. 


https://www.datastax.com/dev/blog/atomic-batches-in-cassandra-1-2



batchlog 테이블은 node local이다.. 


The batchlog table is node-local, along with the rest of the system keyspace.


노드 로컬(node-local) : 배치가 실행되는 노드에서 배치 로그가 저장된다는 것을 의미한다.




Posted by '김용환'
,

[scala] Product 이해하기

scala 2017. 8. 10. 16:20

case class는 정말 scala/spark 코딩할 때 없으면 안되는 괜찮은 클래스이다.

정체를 알게 되면서 Product를 알게 되 었는 데 정리차 글을 정리한다. 




scala 컴파일러는 타입을 추론하는데..


아래 예제를 보면 Animal이라는 트레이트를 믹싱해서 사용하고 있는데, 

최종 결과를 보면 Product with Serializble with Animal이다. 



scala> trait Animal

defined trait Animal


scala> trait FurryAnimal extends Animal

defined trait FurryAnimal


scala> case class Dog(name:String) extends Animal

defined class Dog


scala> case class Cat(name:String) extends Animal

defined class Cat


scala> val x = Array(Dog("Fido"),Cat("Felix"))

x: Array[Product with Serializable with Animal] = Array(Dog(Fido), Cat(Felix))





만약 Animal 트레이트에 직접 Product를 상속하고 Serialable을 믹스인했다면.. 스칼라 컴파일러는 명확하게 Animal타입으로 인식한다. 


scala> trait Animal extends Product with Serializable

defined trait Animal


scala> case class Dog(name: String) extends Animal

defined class Dog


scala> case class Cat(name: String) extends Animal

defined class Cat


scala> Array(Dog("d"), Cat("c"))

res0: Array[Animal] = Array(Dog(d), Cat(c))




https://stackoverflow.com/a/36526557의 핵심 내용을 정리해본다. 


스칼라의 case class는 다음 특징을 갖고 있다. 


1. Product를 자동으로 상속한다.

2. Serializable을 상속한다

3. 패턴 매치에 쓰이기 위해 hashCode와 equals를 상속한다.

4. 타입 분해를 위해 apply와 unapply 메소드를 지원한다. 



case class는 ADT(Product)의 표현하는 방식이다. 

(그래서 분해 되고 패턴매칭 쓰고 Serializable되니 많이 사용될 수 밖에 없다)



참고로 case object 뿐 아니라 case object도 동일한 Product를 상속받은 스칼라 추론이 발생한다. 



scala> trait Animal

defined trait Animal


scala> case object Dog extends Animal

defined object Dog


scala> case object Cat extends Animal

defined object Cat


scala> val animals = List(Dog, Cat)

animals: List[Product with Serializable with Animal] = List(Dog, Cat)




현상은 case case와 동일하다. 


scala> trait Animal extends Product with Serializable

defined trait Animal


scala>  case object Dog extends Animal

defined object Dog


scala> case object Cat extends Animal

defined object Cat


scala>  val animals = List(Dog, Cat)

animals: List[Animal] = List(Dog, Cat)



Product 내부 코드를 살펴보면 다음과 같다.scala.Equals를 믹스인하고 있다.


package scala
trait Product extends scala.Any with scala.Equals {
def productElement(n : scala.Int) : scala.Any
def productArity : scala.Int
def productIterator : scala.Iterator[scala.Any] = { /* compiled code */ }
def productPrefix : java.lang.String = { /* compiled code */ }
}

scala.Equals는 다음과 같다. Product에는 equals를 갖고 있다. 

package scala
trait Equals extends scala.Any {
def canEqual(that : scala.Any) : scala.Boolean
def equals(that : scala.Any) : scala.Boolean
}


Product의 특징을 살펴본다. productArity, productElement, productPrefix, productIterator 메소드를 확인할 수 있다. 


scala> case class Member(id: Integer, lastname: String, firstName: String)

defined class Member


scala> val samuel = Member(1, "YongHwan", "Kim")

samuel: Member = Member(1,YongHwan,Kim)


scala> samuel.productArity

res0: Int = 3


scala> samuel.productElement(0)

res1: Any = 1


scala> samuel.productElement(1)

res2: Any = YongHwan


scala> samuel.productElement(2)

res3: Any = Kim


scala> samuel.productPrefix

res5: String = Member


scala> samuel.productIterator.foreach(println)

1

YongHwan

Kim




마지막 라인을 기억해두며..List와 Map도 모두 Product를 상속받았음을 알 수 있다. 

scala> val a = List("a", "b")
a: List[String] = List(a, b)

scala> a.productIterator.foreach(println)
a
List(b)

scala> val map = 1 -> "a"
map: (Int, String) = (1,a)

scala> map.productIterator.foreach(println)
1
a


좀 더 고급으로가면.. 스칼라 Reflection 코드도 만나게 된다. 결국 Product는 나름 상위 클래스로 사용되고 있음을 알려주는 코드라 할 수 있다. 참고로(ClassTag와 TypeTag는 Reflection 클래스이다.)

https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala



 protected def withParquetFile[T <: Product: ClassTag: TypeTag]

      (data: Seq[T])

      (f: String => Unit): Unit = {

    withTempPath { file =>

      spark.createDataFrame(data).write.parquet(file.getCanonicalPath)

      f(file.getCanonicalPath)

    }

  }


Posted by '김용환'
,