RDD를 쉽게 카산드라 테이블에 변환하고 저장하는 커넥터가 대표적으로 2개가 있다. 


칼리오페라고 있는데. 사실상 더 개발이 되지 않고 있다. 


https://github.com/tuplejump/calliope


      val transformer = CqlRowReader.columnListMapper[Employee]("deptid", "empid", "first_name", "last_name")


      import transformer._


      val cas = CasBuilder.cql3.withColumnFamily("cql3_test", "emp_read_test")


      val casrdd = sc.cql3Cassandra[Employee](cas)


      val result = casrdd.collect().toList


      result must have length (5)

      result should contain(Employee(20, 105, "jack", "carpenter"))

      result should contain(Employee(20, 106, "john", "grumpy"))




많이 사용되는 cassandra connector는 datastax에서 개발되었다. 

현재 버전까지 잘 지원되고 있다. 

https://github.com/datastax/spark-cassandra-connector


간단한 예제는 다음과 같다. 


import org.apache.spark._ 

import com.datastax.spark.connector._


val conf = new SparkConf(true).set("spark.cassandra.connection.host", "ip") 

val sc = new SparkContext("spark://ip:7077", "test", conf)


val testRDD = sc.cassandraTable("testKs", "kv") 

println(testRDD.count) 

println(testRDD.first)

println(testRDD.map(_.getInt("value")).sum) 


val col = sc.parallelize(Seq(("key3", 3), ("key4", 4))) 

col.saveToCassandra("testKs", "kv", SomeColumns("key", "value"))      




Posted by '김용환'
,






스칼라 의존성 라이브러리를 메이븐에서 검색하고 있다가.. 따로 스칼라 패키지를 검색할 수 있는 웹이 있다는 것을 알게 되었다.


http://spark-packages.org



레디스로 검색하면 두 개의 라이브러리가 있고 레디스랩의 spark redis의 평이 하나 더 있다.. 아무래도 레디스랩이라서 쓸만하지 않을까?


https://spark-packages.org/package/RedisLabs/spark-redis



링크를 따라들어가면 스칼라스럽게 개발되어 있다!!


https://github.com/RedisLabs/spark-redis




Posted by '김용환'
,

[spark] RDD join 예제

scala 2017. 3. 29. 23:14


두 RDD의  join 예제이다. 


Member와 Department RDD를 생성한다.


scala> case class Member(id: Int, name: String, dept: Int)


scala> val members = sc.parallelize(List(Member(1,"john", 100), Member(2,"samuel", 200), Member(3,"ethan", 200)))


scala> case class Department(id: Int, name: String)


scala> val departments = sc.parallelize(List(Department(100, "server"), Department(200, "client")))




바로 join하면 에러가 발생한다. 


scala> members.join(departments)

<console>:34: error: value join is not a member of org.apache.spark.rdd.RDD[Member]

       members.join(departments)



먼저 join될 수 있도록 특정 값으로 그룹핑한다. ShuffledRDD 타입이 되었다.


scala> val groupedMembers = members.groupBy(x => x.dept)

groupedMembers: org.apache.spark.rdd.RDD[(Int, Iterable[Member])] = ShuffledRDD[128] at groupBy at <console>:29



scala> groupedMembers.foreach(println)

(100,CompactBuffer(Member(1,john,100)))

(200,CompactBuffer(Member(2,samuel,200), Member(3,ethan,200)))



scala> val groupedDepartments = departments.groupBy(x => x.id)

groupedDepartments: org.apache.spark.rdd.RDD[(Int, Iterable[Department])] = ShuffledRDD[130] at groupBy at <console>:29


scala> groupedDepartments.foreach(println)

(200,CompactBuffer(Department(200,client)))

(100,CompactBuffer(Department(100,server)))





이제 join한다. 정상적으로 join된 것을 볼 수 있다. 



scala> groupedMembers.join(groupedDepartments).foreach(println)

(200,(CompactBuffer(Member(2,samuel,200), Member(3,ethan,200)),CompactBuffer(Department(200,client))))

(100,(CompactBuffer(Member(1,john,100)),CompactBuffer(Department(100,server))))





api를 살펴보면, 여러 필드로 다양한 join을 시도할 수 있다. 



DataFrame join(DataFrame right)
Cartesian join with another DataFrame.
DataFrame join(DataFrame right, Column joinExprs)
Inner join with another DataFrame, using the given join expression.
DataFrame join(DataFrame right, Column joinExprs, java.lang.String joinType)
Join with another DataFrame, using the given join expression.
DataFrame join(DataFrame right, scala.collection.Seq<java.lang.String> usingColumns)
Inner equi-join with another DataFrame using the given columns.
DataFrame join(DataFrame right, java.lang.String usingColumn)
Inner equi-join with another DataFrame using the given column.



Posted by '김용환'
,



spark에서 2개의 컬럼을 가진 테이블을 생성한다.


scala> val df = sc.parallelize(Seq((1, "samuel"), (2, "jack"), (3, "jonathan"))).toDF("id", "name")

df: org.apache.spark.sql.DataFrame = [id: int, column: string]



제대로 생성되었는지 확인하기 위해 df("")을 사용하면 안된다. Column 객체만 얻기 때문이다.


scala> df("name")

res11: org.apache.spark.sql.Column = name



show와 select를 사용하면 제대로 저장되었는지 확인할 수 있다.


scala> df.show

+---+--------+

| id|    name|

+---+--------+

|  1|  samuel|

|  2|    jack|

|  3|jonathan|

+---+--------+


scala> df.select("id").show

+---+

| id|

+---+

|  1|

|  2|

|  3|

+---+



scala> df.select("name").show

+--------+

|    name|

+--------+

|  samuel|

|    jack|

|jonathan|

+--------+


데이터 프레임의 메타 정보를 알려면 columns, dtypes, printSchema를 사용한다.


scala> df.columns
res42: Array[String] = Array(id, name)

scala> df.dtypes
res43: Array[(String, String)] = Array((id,IntegerType), (name,StringType))


scala> df.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)


그리고 filter,where을 사용해 dataframe의 조건절로 사용할 수 있다. 

scala> df.filter(df("id").equalTo("1")).show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
+---+------+


scala> df.filter("id > 2").show
+---+--------+
| id|    name|
+---+--------+
|  3|jonathan|
+---+--------+



scala> df.filter(df("name").equalTo("samuel")).show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
+---+------+



scala> df.where("id=1").show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
+---+------+


또는 col을 사용해 깔끔하게 사용할 수 도 있다. 


scala> df.where(col("id") isin (Seq("1") : _*)).show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
+---+------+




여러 개의 단어가 있는지 확인하는 isin를 사용할 수 있다. 

scala> df.filter(df("name").isin(Seq("samuel", "jack"): _*)).show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
|  2|  jack|
+---+------+



contains를 사용해 특정 단어가 포함되었는지 확인할 수 있다.



scala> df.where(df("name").contains("j")).show

+---+--------+

| id|    name|

+---+--------+

|  2|    jack|

|  3|jonathan|

+---+--------+



between을 사용해 범위를 안다.


scala> df.where(col("id") between (1,2)).show

+---+------+

| id|  name|

+---+------+

|  1|samuel|

|  2|  jack|

+---+------+



select를 이용해 query language처럼 사용할 수 있다. 


scala> df.select("name").where(col("id") between (1,2)).show

+------+

|  name|

+------+

|samuel|

|  jack|

+------+



컬럼을 dataframe에 추가할 수 있다. 


scala> df.withColumn("xx", col("id")).show

+---+--------+---+

| id|    name| xx|

+---+--------+---+

|  1|  samuel|  1|

|  2|    jack|  2|

|  3|jonathan|  3|

+---+--------+---+



컬럼을 dataframe에서 삭제할 수 있다. 


scala> df.drop(col("xxx"))

res56: org.apache.spark.sql.DataFrame = [id: int, name: string]


scala> df.show

+---+--------+

| id|    name|

+---+--------+

|  1|  samuel|

|  2|    jack|

|  3|jonathan|

+---+--------+




데이터 프레임에 새로운 컬럼을 udf를 이용해서 붙일 수 있다.  



scala> import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.udf


scala> case class Member(id:Int, name:String)

defined class Member


scala> case class Member(id:Int, name:String)

defined class Member


scala> val mudf=udf((id:Int, name:String) => Member(1*100, name))

mudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,StructType(StructField(id,IntegerType,false), StructField(name,StringType,true)),Some(List(IntegerType, StringType)))


scala> val df1 = df.withColumn("newColumn", mudf($"id", $"name"))

df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]


scala> df1.show

+---+--------+--------------+

| id|    name|     newColumn|

+---+--------+--------------+

|  1|  samuel|  [100,samuel]|

|  2|    jack|    [100,jack]|

|  3|jonathan|[100,jonathan]|

+---+--------+--------------+


Posted by '김용환'
,



장치에 남은 공간이 없다는 hive 에러가 발생했다면 로컬(local) 파일 시스템에 용량이 없음을 의미한다.


df로 확인한다.



rom log4j:ERROR Failed to flush writer,

java.io.IOException: 장치에 남은 공간이 없음

at java.io.FileOutputStream.writeBytes(Native Method)

at java.io.FileOutputStream.write(FileOutputStream.java:345)

at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)

at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)

at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)

at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)

at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)

at org.apache.log4j.helpers.QuietWriter.flush(QuietWriter.java:59)

at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:324)

at org.apache.log4j.DailyRollingFileAppender.subAppend(DailyRollingFileAppender.java:369)

at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)

at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)

at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)

at org.apache.log4j.Category.callAppenders(Category.java:206)

at org.apache.log4j.Category.forcedLog(Category.java:391)

at org.apache.log4j.Category.log(Category.java:856)

at org.apache.commons.logging.impl.Log4JLogger.info(Log4JLogger.java:176)

at org.apache.hadoop.hive.ql.session.SessionState.createPath(SessionState.java:641)

at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:578)

at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)

at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:671)

at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:615)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.hadoop.util.RunJar.run(RunJar.java:221)

at org.apache.hadoop.util.RunJar.main(RunJar.java:136)




Posted by '김용환'
,




java7 이하에서는 Callable 메소드를 생성하면 다음과 같은 형식으로 개발했다. 



import java.util.concurrent.Callable;

import java.util.concurrent.FutureTask;

...


                 String text = "1";

Callable<Integer> task = new Callable<Integer>() {

@Override

public Integer call() throws Exception {

return Integer.parseInt(text);

}

};


FutureTask<Integer> futureTask = new FutureTask<Integer>(task);

futureTask.run();

try {

System.out.println(futureTask.get());

} catch (Exception e) {

e.printStackTrace();

}




java8에서는 Callable의 call을 람다표현으로 깔끔하게 구성할 수 있다. 



import java.util.concurrent.Callable;

import java.util.concurrent.FutureTask;

...

String text = "1";

FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {

return Integer.parseInt(text);

});

futureTask.run();

try {

System.out.println(futureTask.get());

} catch (Exception e) {

e.printStackTrace();

}





그 이유는 java에서 @FunctionalInterface를 사용하는 코드는 알아서 람다 표현식으로 사용할 수 있게 한다.


@FunctionalInterface

public interface Callable<V> {

    /**

     * Computes a result, or throws an exception if unable to do so.

     *

     * @return computed result

     * @throws Exception if unable to compute a result

     */

    V call() throws Exception;

}


 

Runnable도 동일하다. 


@FunctionalInterface

public interface Runnable {

    /**

     * When an object implementing interface <code>Runnable</code> is used

     * to create a thread, starting the thread causes the object's

     * <code>run</code> method to be called in that separately executing

     * thread.

     * <p>

     * The general contract of the method <code>run</code> is that it may

     * take any action whatsoever.

     *

     * @see     java.lang.Thread#run()

     */

    public abstract void run();

}



비슷하게 Runable을 테스트해본다.



Runnable runabble = new Runnable() {

  @Override

  public void run() {

  System.out.println("Run!!");

  }

};

Thread th = new Thread(runabble);

th.start();

   Thread.sleep(1000);



위의 코드와 아래 코드는 동일하다. 


                Thread th = new Thread(() -> {

System.out.println("Run!!");

});

th.start();

              Thread.sleep(1000);





java8의 functional interface는 java language spec(https://docs.oracle.com/javase/specs/jls/se8/html/jls-9.html#jls-9.8)에서 정의되어 있고 오직 하나의 abstract method으로 구성된다.


Invalid '@FunctionalInterface' annotation; ... is not a functional interface가 뜬다면 잘못 functional interface를 정의했다는 것을 컴파일에러로 알려준다. 



FunctionalInterface를 사용자 정의 선언을 할 수 있다. 


@FunctionalInterface

interface BindCallable {

public List<String> bind();

}


class Bind {

  public List<String> process(BindCallable callable){

    return callable.bind();

  }

}



간단한 예시이다. 


Bind bind = new Bind();

List<String> result = bind.process(() -> {

final String a = "a";

final String b = "b";

List<String> list = Lists.newArrayList();

list.add(a); list.add(b);

return list;

});

System.out.println(result);




Posted by '김용환'
,

메소스(mesos) 공부

scribbling 2017. 3. 27. 14:31


최근 트위터에서 장비 구성을 소개하는 기술 블로그를 작성했다.


https://blog.twitter.com/2017/the-infrastructure-behind-twitter-scale


메소스 기반으로 서비스를 구성하고 있음을 느낄 수 있었다. 









메소스를 많이 활용한 것으로 나타나는 데, 특징적인 것은 코드 라인이 줄어들고 있다는 점이다. 




https://www.wired.com/2013/03/google-borg-twitter-mesos/에 따르면, 트위터의 메소스 역사를 살펴볼 수 있다.



These were Twitter engineers who had once worked at Google: John Sirois, Travis Crawford, and Bill Farner. They told Hindman that they missed Borg, and that Mesos seemed like the perfect way to rebuild it.



He and his fellow engineers continued to run Mesos as an open source software project, but at Twitter, he also worked to move the platform into the company’s data center and fashion something very similar to Google Borg.


트위터의 전직 구글 개발자는 메소스를 구글의 Borg(https://static.googleusercontent.com/media/research.google.com/ko//pubs/archive/43438.pdf)를 생각할 수 있다고 한다.






트위터에서 엔지니어링을 이끌었던 라이버가 트위터의 신규 서비스는 모두 메소스를 사용하도록 했다고 한다. 


http://www.zdnet.co.kr/news/news_view.asp?artice_id=20140819141405

라이버트 CEO는 트위터의 엔지니어링을 이끌었던 인물이다. 그는 트위터의 신규 서비스에 모두 메소스를 사용하도록 했다. 트위터를 떠난 후 에어비앤비에서 메소스 상에 분석 환경을 구축했다. ETL 관리와 스케줄리을 위한 크로노스 아파치 메소스 프레임워크의 주요 저자기도 하다.




메소스의 특징을 한 장으로 표현하면 다음과 같다.



출처 : https://www.slideshare.net/mesosphere/apache-mesos-and-mesosphere-live-webcast-by-ceo-and-cofounder-florian-li





메소스는 다음과 같은 큰 특징이 있다.


  • 효율성(Efficiency) : 운영 체제와 같은 메소스 스케쥴링 기능은 애플리케이션(프레임워크)에서 CPU와 메모리를 관리한다.
  • 고가용성(High availability) : 아파치 주키퍼를 사용해 가용성을 높인다.
  • 모니터링 인터페이스(Monitoring Interface) : 메소스에는 클러스터 모니터링을 위한 넌블러킹(non-blocking) 웹 UI가 있다.
  • 자원 격리 : 리눅스와 도커(Docker) 컨테이너 아키텍처를 지원한다. multi tenancy를 지원한다. 
  • 확장성(Scalability) : 현재까지 현재 버전으로 최대 50,000개의 노드를 지원한다.

메소스 프레임워크는 메소스와 애플리케이션 간의 인터페이스이다.

프레임워크는 작업이 스케쥴링되고 실행되는 계층(layer)이다. 

프레임워크 구현은 애플리케이션에 의존적이어서 프레임워크와 애플리케이션이라는 용어를 불명확하게 사용하고 있다.
메소스 v.0.19.0 이전에는 메소스 프레임워크는 C++ 라이브러리인 libmesos와 바인딩을 해서 개발되었다. Mesos v.0.19.0부터는 개발자가 libmesos 라이브러리 호출없이 원하는 언어를 사용할 수 있도록 HTTP 기반 프로토콜이 도입되었다.

메소스 API를 사용하면 프로그래머가 메소스에서 애플리케이션을 실행하기 위한 자체 프레임워크를 작성할 수 있다.

프레임워크는 스케줄러(scheduler)와 익스큐터(executor)의 두 부분으로 구성된다.

  • 스케줄러(Scheduler) : 제공할 자원에 대한 결정을 내리고 클러스터의 현재 상태를 추적한다. 
  • 익스큐터(Executor) : 슬레이브 노드의 태스크 실행을 책임진다. 

현재 메소스 프레임워크의 목록은 다음과 같다. (특이할 점은 hbase가 없다..)
http://mesos.apache.org/documentation/latest/frameworks/




좀 더 내용을 살펴보려면 메소스 소개자료 1을 참조한다.

Introduction to Apache Mesos from tomasbart


메소스 소개자료 2이다. 



앞의 두 자료를 이해했다면, 아래 슬라이드를 참조하면 좋을 것 같다. 


Introduction to Apache Mesos from Joe Stein


2페이지에서 자료에서 나오는 링크는 다음과 같다.

http://static.usenix.org/event/nsdi11/tech/full_papers/Hindman_new.pdf

https://www.youtube.com/watch?v=0ZFMlO98Jkc&feature=youtu.be

https://static.googleusercontent.com/media/research.google.com/ko//pubs/archive/41684.pdf




찾아보니 슬라이드 자료도 있다. 


Scaling Like Twitter with Apache Mesos from Mesosphere Inc.




* API 정리


메소스의 프로토콜 버퍼 api



Executor API
  • registered : 익스큐터와 메소스와 연결되면 호출된다.
  • reregistered : 익스큐터가 재시작된 슬레이브로 재등록할 때호출된다.
  • disconnected : 익스큐터가 슬레이브와 끊어졌을 때 호출된다.
  • launchTask : executor에서 작업이 시작될 때 호출된다.
  • killTask : 익스큐터에서 실행중인 작업이 종료될 때 호출된다
  • frameworkMessage : 프레임워크의 메시지가 익스큐터에 도착하면 호출된다
  • shutdown : 익스큐터가 현재 실행 중인 모든 작업을 종료해야 할 때 호출된다
  • error : 익스큐터와 익스큐터의 드라이버에서 치명적인 에러가 발생했을 때 호출된다



Executor Driver API

  • start : 익스큐터 드라이버를 시작한다
  • stop :  executor 드라이버를 중지한다.
  • abort :  드라이버를 중단하고 더 이상 익스큐터에 콜백을 만들 수 없도록 한다
  • join :  드라이버가 중지되거나 중단될 때까지 대기한다
  • run :  드라이버를 시작하고 즉시 join() 메서드를 호출한다.
  • sendStatusUpdate :  프레임워크 스케줄러에 상태 변경를 보낸다.
  • sendFrameworkMessage :  프레임워크 스케줄러에게 메시지를 보낸다.


Scheduler API

  • disconnected : 스케줄러가 마스터와의 연결이 끊어지면 호출된다
  • error : 스케줄러 또는 드라이버에 복구 할 수 없는 에러가 발생하면 호출된다
  • executorLost : 익스큐터가 종료되거나 종료될 때 호출된다
  • frameworkMessage : 익스큐터가 메시지를 보낼 때 호출된다
  • offerRescinded : 쿠폰이 더 이상 유효하지 않으면 호출된다
  • registered : 스케줄러가 메소스 마스터에 성공적으로 등록할 때 호출된다
  • reregistered : 스케줄러가 새로 선출된 메소스 마스터에 재등록할 때 호출된다
  • resourceOffers : 자원이 프레임워크에 제공되었을 때 호출된다
  • slaveLost : 슬레이브가 도달할 수 없다고 판단되면 호출된다
  • statusUpdate : 작업 상태가 변경되면 호출된다



Scheduler Driver API

  • abort : 더 이상 콜백이 스케줄러에 생성되지 않도록 드라이버를 중단한다
  • acceptOffers : 주어진 제안을 허용하고 허용된 제안에 대한 일련의 작업을 수행한다
  • acceptOffers : 상태 변경을 확인한다
  • declineOffer : 제안을 전체적으로 거부한다
  • declineOffer : 제안 전체를 거부하고 지정된 필터를 자원에 적용한다
  • join : 드라이버가 멈추거나 중단될 때까지 대기하고 현재 스레드를 무기한 차단한다.
  • killTask : 지정된 작업을 종료한다
  • launchTasks : 주어진 작업 집합을 실행한다
  • reconcileTasks : 해당 코드를 사용하면 프레임워크가 터미널이 아닌 작업의 상태를 확인할 수 있다
  • requestResources : 메소스의 자원를 요청한다
  • reviveOffers : 프레임워크에서 이전에 설정한 모든 필터를 삭제한다
  • run : 드라이버를 시작하고 즉시 조인한다(예, 블럭).
  • sendFrameworkMessage : 프레임워크에서 익스큐터 중 하나에게 메시지를 보낸다
  • start : 스케줄러 드라이버를 시작한다
  • stop : 해당코드는 스케줄러 드라이버를 중지한다. 또는 페일 오버(failover)가 없다고 가정하는 스케줄러 드라이버를 중지한다
  • suppressOffers : 메소스 마스터에게 프레임워크에 대한 제안을 보내는 것을 중지하도록 알려준다




Posted by '김용환'
,


경험상, 통신 서버을 아무리 잘 만들어도 serialization/deserialization에서 성능 저하가 발생한다. 이 부분에 대한 trade off를 늘 고민해야 한다.




최근 트위터에서 스트림 처리 서버 관련 내용을 잘 설명해서 펌질 한다.



http://delivery.acm.org/10.1145/2750000/2742788/p239-kulkarni.pdf?ip=211.56.96.51&id=2742788&acc=TRUSTED&key=4D4702B0C3E38B35%2E4D4702B0C3E38B35%2E4D4702B0C3E38B35%2EE47D41B086F0CDA3&CFID=743695794&CFTOKEN=58998503&__acm__=1490582784_82cd3df2ca63b6c7de75e31c941cdfac




https://blog.twitter.com/2017/optimizing-twitter-heron



  • Repeated Serialization - A tuple is represented as a collection of plain old Java objects. The Java objects are converted into byte arrays using either Kryo or Java serialization. The byte arrays are again serialized when included in a protocol buffers object used for data exchange between stream managers and Heron instances.
  • Eager Deserialization - The stream manager is responsible for receiving and routing protocol buffer tuples. When it receives a tuple, it eagerly deserializes it into a C++ object.
  • Immutability - To simplify the implementation and reasoning, stream manager does not reuse any protobuf objects. For each message received, it uses the malloc allocator to allocate a protobuf object, which it then releases back to the allocator once the operation is completed. Instead of modifying the protobuf in place, it copies the contents to a newly allocated message, makes the modification on the new message and releases the old one.




  • ~17% of the CPU is used to create/delete a protobuf object from memory allocator (not including those protobuf objects allocated on stack).
  • ~15% of the CPU is used to copy a new protobuf object instead of updating one in place.
  • ~18% of the CPU is used to eagerly deserialize a protobuf message, despite the fact that eager deserialization is not needed; instead we could just handle the byte array.




리팩토링 한 부분


  • Added a separate memory pool for each type of protobuf message thereby reducing the expensive cost to create/delete a protobuf message.
  • Changed an internal data structure that caches tuples from std::list to std::deque to facilitate preallocation of protobuf messages.
  • Optimized away the code that was duplicating the protobuf message to do in-place update whenever possible.
  • When a stream manager receives a message from another stream manager, instead of eagerly deserializing the inner tuple message, it now transfers the underlying serialized byte array directly to the instance.



Posted by '김용환'
,


spring3는 ClassPathBeanDefinitionScanner을 제공해서 spring container에 bean을 생성할 수 있도록 도와준다.



ClassPathBeanDefinitionScanner scanner = new PlayClassPathBeanDefinitionScanner(applicationContext);

String scanBasePackage = ..

scanner.scan(scanBasePackage.split(","));



예를 들어, play1 playframework의 spring module은 ClassPathBeanDefinitionScanner을 사용한다. 

https://github.com/pepite/Play--framework-Spring-module/blob/master/src/play/modules/spring/SpringPlugin.java

그런데.. play1 playframework의 spinrg module을 spring4로 변경하면 동작이 안된다. 

spring3과 spring4의 차이를 살펴본다.


문제가 되는 부분을 설명한다. 내부에 ConfigurationClassParser 클래스의 asSourceClass 메소드가 변경되었다.
spring3 코드와 달리 spring4에서는 외부에서 읽힐 때 현재 classload에서 읽지 목한다면 ClassNotFoundException이 생긴다.(안전함을 더 중요하게 여기고 수정되었다)

<spring 3>


/**
* Factory method to obtain a {@link SourceClass} from a {@link Class}.
*/
public SourceClass asSourceClass(Class<?> classType) throws IOException {
try {
// Sanity test that we can read annotations, if not fall back to ASM
classType.getAnnotations();
return new SourceClass(classType);
}
catch (Throwable ex) {
// Enforce ASM via class name resolution
return asSourceClass(classType.getName());
}
}




<spring 4>


https://github.com/spring-projects/spring-framework/blob/5b98a54c9b9f8c2f4332734ee23cd483b7df0d22/spring-context/src/main/java/org/springframework/context/annotation/ConfigurationClassParser.java#L659


/**

 * Factory method to obtain a {@link SourceClass} from a class name.
 */
public SourceClass asSourceClass(String className) throws IOException {
if (className.startsWith("java")) {
// Never use ASM for core java types
try {
return new SourceClass(this.resourceLoader.getClassLoader().loadClass(className));
}
catch (ClassNotFoundException ex) {
throw new NestedIOException("Failed to load class [" + className + "]", ex);
}
}
return new SourceClass(this.metadataReaderFactory.getMetadataReader(className));
}


참고

https://jira.spring.io/browse/SPR-15245



Posted by '김용환'
,

spark-shell에서 특정 라이브러리를 사용하려면 의존성 라이브러리도 같이 사용해야 한다. 


매번 메이븐 저장소에서 찾는 것이 귀찮은데, spark-shell에 packages 플래그를 사용하면 편하게 라이브러리를 다운받을 수 있다. 



 ./spark-shell --packages datastax:spark-cassandra-connector:1.6.0-s_2.11



Ivy Default Cache set to: /Users/samuel.kim/.ivy2/cache

The jars for the packages stored in: /Users/samuel.kim/.ivy2/jars

:: loading settings :: url = jar:file:/usr/local/spark-2.1.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml

datastax#spark-cassandra-connector added as a dependency

:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0

confs: [default]

found datastax#spark-cassandra-connector;1.6.0-s_2.11 in spark-packages

found org.apache.cassandra#cassandra-clientutil;3.0.2 in central

found com.datastax.cassandra#cassandra-driver-core;3.0.0 in central

found io.netty#netty-handler;4.0.33.Final in local-m2-cache

found io.netty#netty-buffer;4.0.33.Final in local-m2-cache

found io.netty#netty-common;4.0.33.Final in local-m2-cache

found io.netty#netty-transport;4.0.33.Final in local-m2-cache

found io.netty#netty-codec;4.0.33.Final in local-m2-cache

found io.dropwizard.metrics#metrics-core;3.1.2 in list

found org.slf4j#slf4j-api;1.7.7 in list

found org.apache.commons#commons-lang3;3.3.2 in list

found com.google.guava#guava;16.0.1 in list

found org.joda#joda-convert;1.2 in list

found joda-time#joda-time;2.3 in central

found com.twitter#jsr166e;1.1.0 in central

found org.scala-lang#scala-reflect;2.11.7 in list

[2.11.7] org.scala-lang#scala-reflect;2.11.7

downloading http://dl.bintray.com/spark-packages/maven/datastax/spark-cassandra-connector/1.6.0-s_2.11/spark-cassandra-connector-1.6.0-s_2.11.jar ...

[SUCCESSFUL ] datastax#spark-cassandra-connector;1.6.0-s_2.11!spark-cassandra-connector.jar (3339ms)

:: resolution report :: resolve 4700ms :: artifacts dl 3348ms

:: modules in use:

com.datastax.cassandra#cassandra-driver-core;3.0.0 from central in [default]

com.google.guava#guava;16.0.1 from list in [default]

com.twitter#jsr166e;1.1.0 from central in [default]

datastax#spark-cassandra-connector;1.6.0-s_2.11 from spark-packages in [default]

io.dropwizard.metrics#metrics-core;3.1.2 from list in [default]

io.netty#netty-buffer;4.0.33.Final from local-m2-cache in [default]

io.netty#netty-codec;4.0.33.Final from local-m2-cache in [default]

io.netty#netty-common;4.0.33.Final from local-m2-cache in [default]

io.netty#netty-handler;4.0.33.Final from local-m2-cache in [default]

io.netty#netty-transport;4.0.33.Final from local-m2-cache in [default]

joda-time#joda-time;2.3 from central in [default]

org.apache.cassandra#cassandra-clientutil;3.0.2 from central in [default]

org.apache.commons#commons-lang3;3.3.2 from list in [default]

org.joda#joda-convert;1.2 from list in [default]

org.scala-lang#scala-reflect;2.11.7 from list in [default]

org.slf4j#slf4j-api;1.7.7 from list in [default]

---------------------------------------------------------------------

|                  |            modules            ||   artifacts   |

|       conf       | number| search|dwnlded|evicted|| number|dwnlded|

---------------------------------------------------------------------

|      default     |   16  |   1   |   1   |   0   ||   16  |   1   |

---------------------------------------------------------------------



만약 proxy 같은 환경 설정을 받는다면 다음과 같이 사용한다 


 ./spark-shell --conf "spark.driver.extraJavaOptions=-Dhttp.proxyHost=<proxyHost> -Dhttp.proxyPort=<proxyPort> -Dhttps.proxyHost=<proxyHost> -Dhttps.proxyPort=<proxyPort>" --packages datastax:spark-cassandra-connector:1.6.0-s_2.11


Posted by '김용환'
,