'2016/10/28'에 해당되는 글 2건

  1. 2016.10.28 [scala] Vector
  2. 2016.10.28 zeppelin 과 spark 연동

[scala] Vector

scala 2016. 10. 28. 17:54



Java의 Vector와 달리 Scala의 Vector의 오퍼레이션은 거의 상수 시간대의 성능이 나올 정도로 훌륭하다. (정확히 말하면 효율적 상수 시간이다)


http://docs.scala-lang.org/overviews/collections/performance-characteristics.html



                        headtailapplyupdateprependappendinsert
VectoreCeCeCeCeCeC-



Vector는 Immutable 컬렉션이고, 트라이(https://en.wikipedia.org/wiki/Trie)라 부르는 순서가 있는 트리 데이터 구조로 구현되어 있다. 트라이에서, 키는 Vector에 저장된 값의 인덱스이다.




Vector의 구현은 패리티(parity) 32의 트리 구조이다. 각 노드는 32개의 배열로 구현되고, 


자식 노드 참조를 32개까지 저장하거나, 32개까지 값을 저장할 수 있다. 


* 주석을 보면, 다음과 같이 설명되어 있다. 

It is backed by a little
* endian bit-mapped vector trie with a branching factor of 32.


32진 트리 구조는 Vector의 복잡도가 왜 "상수 시간" 대신 "효율적 상수 시간"인지 설명한다. Vector의 실제 복잡도는  log(32, N)이며, N은 벡터의 크기를 말한다. 이는 사실 상 상수 시간과 매우 근접하다고 말할 수 있다


Vector는 메모리가 32개의 엘리먼트 청크(chunk)로 할당되기 때문에 매우 큰 순서 집합을 저장하기에 좋은 선택이다. 


해당 청크는 트리의 모든 레벨로 미리 할당하지 않으며 필요할 때마다 할당된다.



간단한 테스트 코드이다.

val vectorEmpty = scala.collection.immutable.Vector.empty
val v = vectorEmpty :+ 1 :+ 2 :+ 3
println(v)
println(v(2))


결과는 다음과 같다.


Vector(1, 2, 3)

3



val is = collection.immutable.IndexedSeq(1, 2, 3)
// scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3)
val v2 = is :+ 4 :+ 5
println(v2)
println(v2(4))


결과는 다음과 같다.


Vector(1, 2, 3, 4, 5)

5




Vector의 2번째 엘리먼트를 구하는 함수는 다음과 같이 개발할 수 있다. 

def returnSecondElement[A](vector: Vector[A]): Option[A] = vector match {
case _ +: x +: _ => Some(x)
case _ => None
}
println(returnSecondElement(v))


결과는 Some(2)이다.








좀 더 내용을 살펴본다.

:+의 구현은 다음과 같다. 커리된 함수와 암시가 쓰였다. 


override def :+[B >: A, That](elem: B) (implicit bf: CanBuildFrom[Vector[A], B, That]): That =
if (isDefaultCBF(bf))
appendBack(elem).asInstanceOf[That]
else super.:+(elem)(bf)




appendBack 함수를 들어가보면, 32진 트리 구조임을 드러나는 코드(31 and 연산)이 나온다. 



또한 Vector를 새로 만들어서 리턴하고 있음을 보여준다. Vector는 Immutable이다. 

  private[immutable] def appendBack[B>:A](value: B): Vector[B] = {
// //println("------- append " + value)
// debug()
if (endIndex != startIndex) {
val blockIndex = endIndex & ~31
val lo = endIndex & 31

if (endIndex != blockIndex) {
//println("will make writable block (from "+focus+") at: " + blockIndex)
val s = new Vector(startIndex, endIndex + 1, blockIndex)
s.initFrom(this)
s.dirty = dirty
s.gotoPosWritable(focus, blockIndex, focus ^ blockIndex)
s.display0(lo) = value.asInstanceOf[AnyRef]
s
} ....


Posted by '김용환'
,

zeppelin 과 spark 연동

scala 2016. 10. 28. 14:32

IPython(노트북)과 R Studio의 큰 장점을 알기 때문에 Spark과 zeppelin을 연동했다


* Hive 연동 완료

http://knight76.tistory.com/entry/apache-zepplin-062-%EC%84%A4%EC%B9%98-hive-R-%EC%97%B0%EB%8F%99




Legacy Spark 연동할 수 있도록 수정한다. 제플린의 용도는 이문수님이 Spark 써밋에서 발표한 내용을 참고한다.





데몬을 실행하면, 2개의 데몬이 실행됨을 볼 수 있다. 하나는 zeppelin 이고, 하나는 spark 이다. 따라서 간단한 spark 코드를 실행할 수 있다. 기본 내장이다. 


$ ./bin/zeppelin-daemon.sh start 


 /usr/java/default/bin/java..... org.apache.zeppelin.server.ZeppelinServer

/usr/java/default/bin/java -cp ... /usr/local/zeppelin/interpreter/spark/zeppelin-spark_2.11-0.6.2.jar 







Legacy Hadoop과 연동할 수 있도록 Spark 를 구성한다. 


현재 zepplelin은 cpu 코어를 min, max 이렇게 사용하는 구조가 아니기 때문에(dedicate),  따로 zeppelin용 spark을 구성했다. (1대의 Driver와 4대의 executor를 설치) spark에서 hadoop에 접근할 수 있도록 hadoop 설정 정보를 포함시킨다. 


dirver의 8080 페이지로 접속해서 정상적으로 동작하는지 확인한다. 



 1.6.2 Spark Master at spark://master:7077



이제 zeppelin만 잘 설정하면 된다. 참고로 zeppelin의 conf/zeppelin-site.xml에 보면, spark 인터프리터가 존재한다.


<property>

  <name>zeppelin.interpreters</name>

  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter</value>

  <description>Comma separated interpreter configurations. First interpreter become a default</description>

</property>

..




conf/interpeter.json에서 spark.cores.max와 spark.executor.memory 잘 설정한다. 그냥 두면 기본 값을 사용하니. 병목이 발생할 수 있다. 관련 내용은 https://zeppelin.apache.org/docs/latest/interpreter/spark.html에 있으니 참조한다.



      "id": "2C1FUYTWJ",

      "name": "spark",

      "group": "spark",

      "properties": {

        "spark.executor.memory": "",

        "args": "",

        "zeppelin.spark.printREPLOutput": "true",

        "spark.cores.max": "32",




bin/common.sh에서 일부 수정해서 메모리 이슈가 없도록 수정한다. 


if [[ -z "${ZEPPELIN_MEM}" ]]; then

  export ZEPPELIN_MEM="-Xms3024m -Xmx5024m -XX:MaxPermSize=512m"

fi


if [[ -z "${ZEPPELIN_INTP_MEM}" ]]; then

  export ZEPPELIN_INTP_MEM="-Xms3024m -Xmx5024m -XX:MaxPermSize=512m"

fi




배시 환경에 따라 .bashrc에 다음을 추가한다.


export http_proxy=

export SPARK_HOME=





마지막으로 재시작한다. 


$ ./bin/zeppelin-daemon.sh restart 




노트 하나를 만들어 spark 코드가 잘 동작되는지 확인한다.


val textFile = sc.textFile("hdfs:///google/log/2016/10/23/00/*")
val count = textFile.count();
println(count)





노트북 관련 좋은 데모를 소개한다.


https://github.com/uosdmlab/playdata-zeppelin-notebook

Posted by '김용환'
,