스파크 (spark)  join은 두가지 전략이 있다.


셔플 조인(shuffle join)과 브로드 캐스트 조인(broadcast join)이 있다.


이 기반에는 wide dependency와 narrow dependency가  있다. 즉 최대한 driver와 executor 간 데이터 교환의 차이를 설명한 것으로서 개발 코드에 따라 성능이 달라진다.

(https://knight76.tistory.com/entry/spark-%ED%8E%8C%EC%A7%88-wide-dependecy-narrow-dependency)



셔플 조인은 조인된 데이터를 동일한 executor로 이동하기 위해 셔플 연산을 사용한다. 각 로우를 해시로 표현 및 생성한 후 적절한 장소에 보낸다. 따라서 데이터 이동이 많이 발생한다. 내부적으로 merge sort join보다 더 많은 비용이 드는 해싱을 사용한다.


따라서 join시 데이터  흐름이 narrow해지고 executor간의 데이터 복사 비용으로 속도가 떨어지게 된다.






브로드캐스트 조인은 데이터를  executor로 복사하지만,  executor간에 데이터복사가 일어나지 않기 때문에 속도가 빨라질 수 있다.


(사실 정확하게 말하면, 데이터의 양/join에 따라 브로드캐스트 조인이 좋을지, 셔플 조인이 좋을지는 테스트해봐야 한다. 이를 위해 spark sql  optimizer가 그걸 내부적으로 결정하기도 한다)






아래 코드는 Spark Definitve Guide의 예제 코드인데, explain으로 어떤 조인이 사용되었는지 알 수 있다. 


package spark.example.tutorial.chapter08

import spark.example.common.SparkHelper

object JoinMain extends SparkHelper {
def main(args: Array[String]): Unit = {
import spark.implicits._

val person = Seq(
(0, "Bill Chaambers", 0, Seq(100)),
(1, "Matei Zaharia", 1, Seq(500, 250, 100)),
(2, "Michael Armbrust", 1, Seq(250, 100)))
.toDF("id", "name", "graduate_program", "spark_status")

val graduateProgram = Seq(
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley"))
.toDF("id", "degree", "department", "school")

person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")

val joinExpr1 = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr1).explain()

import org.apache.spark.sql.functions.broadcast
val joinExpr2 = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(graduateProgram), joinExpr2).explain()

}
}


보면 둘다 broadcast join이다. spark 내부  optimizer가 힌트를 주든 안주든 내부적으로 broadcast 조인을 할 수 있게 되었다.


== Physical Plan ==

*(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildLeft

:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)))

:  +- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12]

+- LocalTableScan [id#26, degree#27, department#28, school#29]


== Physical Plan ==

*(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildRight

:- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12]

+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))

   +- LocalTableScan [id#26, degree#27, department#28, school#29]





관련 내용은 참조글은 아래 글을 기반으로 한다.


https://www.waitingforcode.com/apache-spark-sql/shuffle-join-spark-sql/read


https://henning.kropponline.de/2016/12/11/broadcast-join-with-spark/






Posted by '김용환'
,




저장 데이터가 커지면 1대의 장비에 다 저장할 수 없다.


이를 위해 DB에서는 샤딩(sharding)이라는 방식을 사용해 데이터를 분산 저장한다.(사실 플러스로 복제(replica)를 둔다)


hashing, dictionary-hashing, incremental sharding 이렇게 분리한다.



DB 외에 Nosql마다 비슷한 용어를 가지고 있다. 샤딩은 사실 파티션(partition)이라는 일반적인 단어로 포함할 수 있다.



파티션은 일반DB의 샤딩(공식 용어는 아니다),  몽고 DB, 일래스틱서치, 솔라의 샤드(shard)에 해당된다.


HBase에서는 리전(Region), 


구글 Bigtable에서는 테블릿(tablet),


카산드라(Cassandra)와 Riak은 vnode,


Couchbase에서는 vBucket이라 부른다.




Posted by '김용환'
,



spark에서 UDF (UserDefinedFunction)을 사용하다 아래와 같은 에러가 발생했다.


Exception in thread "main" java.lang.InternalError: Malformed class name

at java.lang.Class.getSimpleName(Class.java:1330)

at org.apache.spark.sql.execution.aggregate.ScalaUDAF.nodeName(udaf.scala:451)



아래와 같은 UDF 코드인데. 



object UDFExampleDemo {


    def main(args: Array[String]) {

    

        class BoolAnd extends UserDefinedAggregateFunction {

...

        }

        ...

    }

}



아래와 같이 변경해야 에러가 발생하지 않는다. 



class BoolAnd extends UserDefinedAggregateFunction {

...

}

 

 

object UDFExampleDemo {


    def main(args: Array[String]) {

    

        ...

    }

}





node name을 얻을 려면 udaf의 class의 getSimpleName을 호출한다.

override def nodeName: String = udaf.getClass.getSimpleName


java의 Class코드를 보면, 

Malformed class name을 발생하는 코드 앞에 주석 설명이 잘 되어 있다.  getSimpleName은 적당한 깊이의  클래스의 simple name을 읽어 오는 것만 허락한다.  따라서 udaf를 상속한 클래스의 위치가 main 안에 두면 안된다.





 java.lang.Class의 getSimpleName 코드



public String getSimpleName() {
if (isArray())
return getComponentType().getSimpleName()+"[]";

String simpleName = getSimpleBinaryName();
if (simpleName == null) { // top level class
simpleName = getName();
return simpleName.substring(simpleName.lastIndexOf(".")+1); // strip the package name
}
// According to JLS3 "Binary Compatibility" (13.1) the binary
// name of non-package classes (not top level) is the binary
// name of the immediately enclosing class followed by a '$' followed by:
// (for nested and inner classes): the simple name.
// (for local classes): 1 or more digits followed by the simple name.
// (for anonymous classes): 1 or more digits.

// Since getSimpleBinaryName() will strip the binary name of
// the immediatly enclosing class, we are now looking at a
// string that matches the regular expression "\$[0-9]*"
// followed by a simple name (considering the simple of an
// anonymous class to be the empty string).

// Remove leading "\$[0-9]*" from the name
int length = simpleName.length();
if (length < 1 || simpleName.charAt(0) != '$')
throw new InternalError("Malformed class name");


Posted by '김용환'
,



김창준 씨의 함께 자라기(애자일로 가는 길) 중 구글의 실험 내용을 정리한 아주 일부분만 발췌한다.



Oxygen Project : 구글이 관리자를 없애는 실험을 2002년에 실험했지만 결과는 좋지 않았다. 이에 인재 분석팀을 꾸려 뛰어난 관리자의 특징을 찾는 연구인 Oxgen Project를 2008년에 시작했다.


이후 Aristotle Project를 진행했고 2015년에 발표했다.

https://rework.withgoogle.com/print/guides/5721312655835136/



김창준씨가 중요하게 여긴 세 부분은 다음과 같다.


1. 팀에 누가 있는지(전문가, 내향/외향), 지능 등)보다 팀원들이 서로 어떻게 상호작용하고 자신의 일을 어떻게 바라보는지가 훨씬 중요했다.



2. 5가지 성공적 팀의 특징을 찾았는데, 그 중 압도적 높은 예측력을 보인 변수는 팀의 심리적 안전감(내 생각, 의견, 질문, 걱정, 실수가 드러났을 때 처벌받거나 놀림받지 않을 거라는 믿음)이었다.


3. 팀 토론 등 특별히 고안된 활동(gTeams execise)라 불리는 활동인데, 10분간 5가지 성공적인 팀의 특징에 대해 팀원들이 답하고 팀이 얼마나 잘하는 요약 보고서를 보고, 결과에 대해 면대면 토론을 하고 팀이 개선하게 자원(교육 등)을 제공하는 것이라 한다.





참고로..


외국의 좋은 IT회사에서는 관리자는 면대면 토론을 항상 진행하고 멘토링을 진행한다고 한다. 심리적 안정감을 계속 주는 활동을 진행한다고 한다. 



https://rework.withgoogle.com/print/guides/5721312655835136/ 



팀은 무엇인가?  데이터를 모으고 효율을 측정하는 방법, 효율적인 팀을 정의하는 방법이 있다.


The researchers found that what really mattered was less about who is on the team, and more about how the team worked together. In order of importance:







참고할만한다. 문화를 만드는데 중요한 것 같다..





Posted by '김용환'
,