여러 컨슈머가 동일 토픽에서 메시지를 읽을 때 사용하는 주요 패턴은  다음 2가지이다.


<로드 밸런싱(load balancing)>


각 메시지는 여러 컨슈머 중 특정 컨슈머에 전달된다. 따라서 컨슈머들(컨슈머 그룹)은 해당 토픽의 메시지를 처리하는 작업을 공유한다. 브로커는 메시지를 전달할 컨슈머를 임의로 지정한다. 이 패턴은 메시지를 처리하는 비용이 비싸서 처리를 병렬화하기 위해 컨슈머를 추가하고 싶을 때 유용하다. 또한 컨슈머가 죽으면 관련 컨슈머 그룹에서 나머지 실시간  컨슈머에게 나누어진다.


IBM 문서에 친절히 설명되어 있다.

https://www.ibm.com/support/knowledgecenter/ko/SSCGGQ_1.2.0/com.ibm.ism.doc/Developing/sharedsubscriptionsinjms.html



AMQP는 같은 큐를 소비하는 클라이언트를 여러 개 둬서 로드 밸런싱을 구현할 수 있다. JMS에서는 이 방식을 shared subscription이라 한다. 카프카에서는 load share라고 한다.



<팬 아웃(fan out)>


각 메시지가 모든 컨슈머에 전달된다. 컨슈머가 브로드캐스팅된 동일한 메시지를 서로 간섭없이 들을 수 있다. 


JMS에서는 토픽 구독, AMQP에서는 바인딩 교환이라는 용어를 사용한다. 카프카에서는 팬 아웃이라고 한다.



카프카의 경우, 컨슘될 때 카프카에서 메시지가 제거되지 않아서 소비자를 여럿 추가 할 수 있고 각 컨슈머는 자체 메시지 오프셋을 유지 관리할 수 있습니다. 물론 컨슈머는 서로 다른 컨슈머 그룹에 있어야 한다.





이 두 패턴은 함께 사용할 수 있다.




Posted by 김용환 '김용환'


Spark에서 Sqlite DB 테이블을 읽어오는 예시이다. 


spark jdbc로 데이터 프레임을 읽을 때 항상 모든 테이블 로우를 읽을 수 있을 뿐 아니라.


특정 쿼리의 데이터만 읽을 수 있다.




두가지 방법이 있는데. 


첫 번째는 option("dbtable)에  쿼리를 추가하는 방법,


두 번째는 jdbc를 읽을 때 predicate(where절 같은 형태)를 추가하는 방법이 있다.




object JDBCMain extends SparkHelper {
def main(args: Array[String]): Unit = {
val driver = "org.sqlite.JDBC"
val path = "origin-source/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:${path}"
val tablename = "flight_info"

// driver loading
import java.sql.DriverManager
Class.forName("org.sqlite.JDBC")
val connection = DriverManager.getConnection(url)
println(connection.isClosed)
println(connection.close())

val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
val newDbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.load()
newDbDataFrame.explain()

println("predicates--")
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array(
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates, props).count())
println(spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions)

val predicates2 = Array(
"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates2, props).count())
println(spark.read.jdbc(url, tablename, predicates2, props).rdd.getNumPartitions)



이 예시는 Spark Definitive Guide에 있고 깃허브(https://github.com/knight76/spark-definitive-guide-sbt)에 저장되어 있다. 



Posted by 김용환 '김용환'



스파크 (spark)  join은 두가지 전략이 있다.


셔플 조인(shuffle join)과 브로드 캐스트 조인(broadcast join)이 있다.


이 기반에는 wide dependency와 narrow dependency가  있다. 즉 최대한 driver와 executor 간 데이터 교환의 차이를 설명한 것으로서 개발 코드에 따라 성능이 달라진다.

(https://knight76.tistory.com/entry/spark-%ED%8E%8C%EC%A7%88-wide-dependecy-narrow-dependency)



셔플 조인은 조인된 데이터를 동일한 executor로 이동하기 위해 셔플 연산을 사용한다. 각 로우를 해시로 표현 및 생성한 후 적절한 장소에 보낸다. 따라서 데이터 이동이 많이 발생한다. 내부적으로 merge sort join보다 더 많은 비용이 드는 해싱을 사용한다.


따라서 join시 데이터  흐름이 narrow해지고 executor간의 데이터 복사 비용으로 속도가 떨어지게 된다.






브로드캐스트 조인은 데이터를  executor로 복사하지만,  executor간에 데이터복사가 일어나지 않기 때문에 속도가 빨라질 수 있다.


(사실 정확하게 말하면, 데이터의 양/join에 따라 브로드캐스트 조인이 좋을지, 셔플 조인이 좋을지는 테스트해봐야 한다. 이를 위해 spark sql  optimizer가 그걸 내부적으로 결정하기도 한다)






아래 코드는 Spark Definitve Guide의 예제 코드인데, explain으로 어떤 조인이 사용되었는지 알 수 있다. 


package spark.example.tutorial.chapter08

import spark.example.common.SparkHelper

object JoinMain extends SparkHelper {
def main(args: Array[String]): Unit = {
import spark.implicits._

val person = Seq(
(0, "Bill Chaambers", 0, Seq(100)),
(1, "Matei Zaharia", 1, Seq(500, 250, 100)),
(2, "Michael Armbrust", 1, Seq(250, 100)))
.toDF("id", "name", "graduate_program", "spark_status")

val graduateProgram = Seq(
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley"))
.toDF("id", "degree", "department", "school")

person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")

val joinExpr1 = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr1).explain()

import org.apache.spark.sql.functions.broadcast
val joinExpr2 = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(graduateProgram), joinExpr2).explain()

}
}


보면 둘다 broadcast join이다. spark 내부  optimizer가 힌트를 주든 안주든 내부적으로 broadcast 조인을 할 수 있게 되었다.


== Physical Plan ==

*(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildLeft

:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)))

:  +- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12]

+- LocalTableScan [id#26, degree#27, department#28, school#29]


== Physical Plan ==

*(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildRight

:- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12]

+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))

   +- LocalTableScan [id#26, degree#27, department#28, school#29]





관련 내용은 참조글은 아래 글을 기반으로 한다.


https://www.waitingforcode.com/apache-spark-sql/shuffle-join-spark-sql/read


https://henning.kropponline.de/2016/12/11/broadcast-join-with-spark/






Posted by 김용환 '김용환'




저장 데이터가 커지면 1대의 장비에 다 저장할 수 없다.


이를 위해 DB에서는 샤딩(sharding)이라는 방식을 사용해 데이터를 분산 저장한다.(사실 플러스로 복제(replica)를 둔다)


hashing, dictionary-hashing, incremental sharding 이렇게 분리한다.



DB 외에 Nosql마다 비슷한 용어를 가지고 있다. 샤딩은 사실 파티션(partition)이라는 일반적인 단어로 포함할 수 있다.



파티션은 일반DB의 샤딩(공식 용어는 아니다),  몽고 DB, 일래스틱서치, 솔라의 샤드(shard)에 해당된다.


HBase에서는 리전(Region), 


구글 Bigtable에서는 테블릿(tablet),


카산드라(Cassandra)와 Riak은 vnode,


Couchbase에서는 vBucket이라 부른다.




Posted by 김용환 '김용환'



spark에서 UDF (UserDefinedFunction)을 사용하다 아래와 같은 에러가 발생했다.


Exception in thread "main" java.lang.InternalError: Malformed class name

at java.lang.Class.getSimpleName(Class.java:1330)

at org.apache.spark.sql.execution.aggregate.ScalaUDAF.nodeName(udaf.scala:451)



아래와 같은 UDF 코드인데. 



object UDFExampleDemo {


    def main(args: Array[String]) {

    

        class BoolAnd extends UserDefinedAggregateFunction {

...

        }

        ...

    }

}



아래와 같이 변경해야 에러가 발생하지 않는다. 



class BoolAnd extends UserDefinedAggregateFunction {

...

}

 

 

object UDFExampleDemo {


    def main(args: Array[String]) {

    

        ...

    }

}





node name을 얻을 려면 udaf의 class의 getSimpleName을 호출한다.

override def nodeName: String = udaf.getClass.getSimpleName


java의 Class코드를 보면, 

Malformed class name을 발생하는 코드 앞에 주석 설명이 잘 되어 있다.  getSimpleName은 적당한 깊이의  클래스의 simple name을 읽어 오는 것만 허락한다.  따라서 udaf를 상속한 클래스의 위치가 main 안에 두면 안된다.





 java.lang.Class의 getSimpleName 코드



public String getSimpleName() {
if (isArray())
return getComponentType().getSimpleName()+"[]";

String simpleName = getSimpleBinaryName();
if (simpleName == null) { // top level class
simpleName = getName();
return simpleName.substring(simpleName.lastIndexOf(".")+1); // strip the package name
}
// According to JLS3 "Binary Compatibility" (13.1) the binary
// name of non-package classes (not top level) is the binary
// name of the immediately enclosing class followed by a '$' followed by:
// (for nested and inner classes): the simple name.
// (for local classes): 1 or more digits followed by the simple name.
// (for anonymous classes): 1 or more digits.

// Since getSimpleBinaryName() will strip the binary name of
// the immediatly enclosing class, we are now looking at a
// string that matches the regular expression "\$[0-9]*"
// followed by a simple name (considering the simple of an
// anonymous class to be the empty string).

// Remove leading "\$[0-9]*" from the name
int length = simpleName.length();
if (length < 1 || simpleName.charAt(0) != '$')
throw new InternalError("Malformed class name");


Posted by 김용환 '김용환'



김창준 씨의 함께 자라기(애자일로 가는 길) 중 구글의 실험 내용을 정리한 아주 일부분만 발췌한다.



Oxygen Project : 구글이 관리자를 없애는 실험을 2002년에 실험했지만 결과는 좋지 않았다. 이에 인재 분석팀을 꾸려 뛰어난 관리자의 특징을 찾는 연구인 Oxgen Project를 2008년에 시작했다.


이후 Aristotle Project를 진행했고 2015년에 발표했다.

https://rework.withgoogle.com/print/guides/5721312655835136/



김창준씨가 중요하게 여긴 세 부분은 다음과 같다.


1. 팀에 누가 있는지(전문가, 내향/외향), 지능 등)보다 팀원들이 서로 어떻게 상호작용하고 자신의 일을 어떻게 바라보는지가 훨씬 중요했다.



2. 5가지 성공적 팀의 특징을 찾았는데, 그 중 압도적 높은 예측력을 보인 변수는 팀의 심리적 안전감(내 생각, 의견, 질문, 걱정, 실수가 드러났을 때 처벌받거나 놀림받지 않을 거라는 믿음)이었다.


3. 팀 토론 등 특별히 고안된 활동(gTeams execise)라 불리는 활동인데, 10분간 5가지 성공적인 팀의 특징에 대해 팀원들이 답하고 팀이 얼마나 잘하는 요약 보고서를 보고, 결과에 대해 면대면 토론을 하고 팀이 개선하게 자원(교육 등)을 제공하는 것이라 한다.





참고로..


외국의 좋은 IT회사에서는 관리자는 면대면 토론을 항상 진행하고 멘토링을 진행한다고 한다. 심리적 안정감을 계속 주는 활동을 진행한다고 한다. 



https://rework.withgoogle.com/print/guides/5721312655835136/ 



팀은 무엇인가?  데이터를 모으고 효율을 측정하는 방법, 효율적인 팀을 정의하는 방법이 있다.


The researchers found that what really mattered was less about who is on the team, and more about how the team worked together. In order of importance:







참고할만한다. 문화를 만드는데 중요한 것 같다..





Posted by 김용환 '김용환'



Spark 데이터 프레임의 StatFunctions 패키지 함수 중 monotonically_increasing_id를 사용하면

데이터 프레임의 로우에 할당된 고유 ID를 출력한다.



import org.apache.spark.sql.functions.monotonically_increasing_id
df.select(monotonically_increasing_id()).show(5)



결과


+-----------------------------+

|monotonically_increasing_id()|

+-----------------------------+

|                            0|

|                            1|

|                            2|

|                            3|

|                            4|

+-----------------------------+

only showing top 5 rows


Posted by 김용환 '김용환'



Spark에서 DataFrame으로 장난치다가

 requirement failed: Currently correlation calculation for columns with dataType string not supported.라는 에러를 만날 수 있다. 


이는 데이터 타입을 inferScheme을 통해 추론하는데 데이터 타입이 Long/Int로 변환되야 하는데 String 타입으로 변환된 컬럼 데이터를 corr라는 sql 함수로 계산하다가 에러가 발생한 것이다.


이럴 때는 명시적으로 StructType을 사용해 스키마를 지원하는 것이 좋다.



import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}

val myManualSchema = StructType(Array(
StructField("InvoiceNo", LongType, true),
StructField("StockCode", StringType, true),
StructField("Description", StringType, true),
StructField("Quantity", LongType, true),
StructField("InvoiceDate", StringType, true),
StructField("UnitPrice", LongType, true),
StructField("CustomerID", StringType, true),
StructField("Country", StringType, true)
))

val df = spark.read.format("csv")
.option("header", true)
.schema(myManualSchema)
.load("origin-source/data/retail-data/by-day/2010-12-01.csv")


Posted by 김용환 '김용환'


ssh-keyscan, ssh-keygen 예시






$ ssh-keyscan github.com


github.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkccK...





$ ssh-keyscan -H github.com


|1|IVo9dmTn5FMnAkZ+4xWUuevH5To=|BQBOJ80KCa5BxDJxjGV+ElrfSvw= ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkccK..





$ ssh-keygen -lf <(ssh-keyscan github.com 2>/dev/null)


2048 SHA256:nThbg6kXUpJWGl7E1IGO,,, github.com (RSA)







$ ssh-keygen -lf <(ssh-keyscan -H github.com 2>/dev/null)


2048 SHA256:nThbg6kXUpJWGl7E1IGO... (RSA)

Posted by 김용환 '김용환'





깃허브를 깃 서버로 사용한다면 깃허브 웹 사이트에서 SSH 키 지문(http://bit.ly/1DffcxK)을 조회할 수 있다.


이 글을 쓰는 시점에서 깃허브의 base64 포맷의 SHA256 RSA 지문(최신 포맷)은 SHA256:abc...이며 16진수 MD5 RSA 지문(이전 포맷)은 11:22:33:...:44의 포맷이다.


OpenSSH 6.8에서 기본 지문 포맷을 16진수 MD5에서 base64 SHA256으로 변경했고 현재 번역하는 시점의 7.9에서도 여전히 동일한 포맷을 사용 중이다


https://www.openssh.com/txt/release-7.9


Posted by 김용환 '김용환'