[scala] Vector

scala 2016. 10. 28. 17:54



Java의 Vector와 달리 Scala의 Vector의 오퍼레이션은 거의 상수 시간대의 성능이 나올 정도로 훌륭하다. (정확히 말하면 효율적 상수 시간이다)


http://docs.scala-lang.org/overviews/collections/performance-characteristics.html



                        headtailapplyupdateprependappendinsert
VectoreCeCeCeCeCeC-



Vector는 Immutable 컬렉션이고, 트라이(https://en.wikipedia.org/wiki/Trie)라 부르는 순서가 있는 트리 데이터 구조로 구현되어 있다. 트라이에서, 키는 Vector에 저장된 값의 인덱스이다.




Vector의 구현은 패리티(parity) 32의 트리 구조이다. 각 노드는 32개의 배열로 구현되고, 


자식 노드 참조를 32개까지 저장하거나, 32개까지 값을 저장할 수 있다. 


* 주석을 보면, 다음과 같이 설명되어 있다. 

It is backed by a little
* endian bit-mapped vector trie with a branching factor of 32.


32진 트리 구조는 Vector의 복잡도가 왜 "상수 시간" 대신 "효율적 상수 시간"인지 설명한다. Vector의 실제 복잡도는  log(32, N)이며, N은 벡터의 크기를 말한다. 이는 사실 상 상수 시간과 매우 근접하다고 말할 수 있다


Vector는 메모리가 32개의 엘리먼트 청크(chunk)로 할당되기 때문에 매우 큰 순서 집합을 저장하기에 좋은 선택이다. 


해당 청크는 트리의 모든 레벨로 미리 할당하지 않으며 필요할 때마다 할당된다.



간단한 테스트 코드이다.

val vectorEmpty = scala.collection.immutable.Vector.empty
val v = vectorEmpty :+ 1 :+ 2 :+ 3
println(v)
println(v(2))


결과는 다음과 같다.


Vector(1, 2, 3)

3



val is = collection.immutable.IndexedSeq(1, 2, 3)
// scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3)
val v2 = is :+ 4 :+ 5
println(v2)
println(v2(4))


결과는 다음과 같다.


Vector(1, 2, 3, 4, 5)

5




Vector의 2번째 엘리먼트를 구하는 함수는 다음과 같이 개발할 수 있다. 

def returnSecondElement[A](vector: Vector[A]): Option[A] = vector match {
case _ +: x +: _ => Some(x)
case _ => None
}
println(returnSecondElement(v))


결과는 Some(2)이다.








좀 더 내용을 살펴본다.

:+의 구현은 다음과 같다. 커리된 함수와 암시가 쓰였다. 


override def :+[B >: A, That](elem: B) (implicit bf: CanBuildFrom[Vector[A], B, That]): That =
if (isDefaultCBF(bf))
appendBack(elem).asInstanceOf[That]
else super.:+(elem)(bf)




appendBack 함수를 들어가보면, 32진 트리 구조임을 드러나는 코드(31 and 연산)이 나온다. 



또한 Vector를 새로 만들어서 리턴하고 있음을 보여준다. Vector는 Immutable이다. 

  private[immutable] def appendBack[B>:A](value: B): Vector[B] = {
// //println("------- append " + value)
// debug()
if (endIndex != startIndex) {
val blockIndex = endIndex & ~31
val lo = endIndex & 31

if (endIndex != blockIndex) {
//println("will make writable block (from "+focus+") at: " + blockIndex)
val s = new Vector(startIndex, endIndex + 1, blockIndex)
s.initFrom(this)
s.dirty = dirty
s.gotoPosWritable(focus, blockIndex, focus ^ blockIndex)
s.display0(lo) = value.asInstanceOf[AnyRef]
s
} ....


Posted by '김용환'
,

zeppelin 과 spark 연동

scala 2016. 10. 28. 14:32

IPython(노트북)과 R Studio의 큰 장점을 알기 때문에 Spark과 zeppelin을 연동했다


* Hive 연동 완료

http://knight76.tistory.com/entry/apache-zepplin-062-%EC%84%A4%EC%B9%98-hive-R-%EC%97%B0%EB%8F%99




Legacy Spark 연동할 수 있도록 수정한다. 제플린의 용도는 이문수님이 Spark 써밋에서 발표한 내용을 참고한다.





데몬을 실행하면, 2개의 데몬이 실행됨을 볼 수 있다. 하나는 zeppelin 이고, 하나는 spark 이다. 따라서 간단한 spark 코드를 실행할 수 있다. 기본 내장이다. 


$ ./bin/zeppelin-daemon.sh start 


 /usr/java/default/bin/java..... org.apache.zeppelin.server.ZeppelinServer

/usr/java/default/bin/java -cp ... /usr/local/zeppelin/interpreter/spark/zeppelin-spark_2.11-0.6.2.jar 







Legacy Hadoop과 연동할 수 있도록 Spark 를 구성한다. 


현재 zepplelin은 cpu 코어를 min, max 이렇게 사용하는 구조가 아니기 때문에(dedicate),  따로 zeppelin용 spark을 구성했다. (1대의 Driver와 4대의 executor를 설치) spark에서 hadoop에 접근할 수 있도록 hadoop 설정 정보를 포함시킨다. 


dirver의 8080 페이지로 접속해서 정상적으로 동작하는지 확인한다. 



 1.6.2 Spark Master at spark://master:7077



이제 zeppelin만 잘 설정하면 된다. 참고로 zeppelin의 conf/zeppelin-site.xml에 보면, spark 인터프리터가 존재한다.


<property>

  <name>zeppelin.interpreters</name>

  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter</value>

  <description>Comma separated interpreter configurations. First interpreter become a default</description>

</property>

..




conf/interpeter.json에서 spark.cores.max와 spark.executor.memory 잘 설정한다. 그냥 두면 기본 값을 사용하니. 병목이 발생할 수 있다. 관련 내용은 https://zeppelin.apache.org/docs/latest/interpreter/spark.html에 있으니 참조한다.



      "id": "2C1FUYTWJ",

      "name": "spark",

      "group": "spark",

      "properties": {

        "spark.executor.memory": "",

        "args": "",

        "zeppelin.spark.printREPLOutput": "true",

        "spark.cores.max": "32",




bin/common.sh에서 일부 수정해서 메모리 이슈가 없도록 수정한다. 


if [[ -z "${ZEPPELIN_MEM}" ]]; then

  export ZEPPELIN_MEM="-Xms3024m -Xmx5024m -XX:MaxPermSize=512m"

fi


if [[ -z "${ZEPPELIN_INTP_MEM}" ]]; then

  export ZEPPELIN_INTP_MEM="-Xms3024m -Xmx5024m -XX:MaxPermSize=512m"

fi




배시 환경에 따라 .bashrc에 다음을 추가한다.


export http_proxy=

export SPARK_HOME=





마지막으로 재시작한다. 


$ ./bin/zeppelin-daemon.sh restart 




노트 하나를 만들어 spark 코드가 잘 동작되는지 확인한다.


val textFile = sc.textFile("hdfs:///google/log/2016/10/23/00/*")
val count = textFile.count();
println(count)





노트북 관련 좋은 데모를 소개한다.


https://github.com/uosdmlab/playdata-zeppelin-notebook

Posted by '김용환'
,



스칼라 언어가 sugar syntax가 많이 있는 반면, Option에 대한 에러 처리 부분은 친절하지 않다. Option은 Some과 None만 있는 ADT이지 Throwable이나 Exception와 값을 갖지 못한다. 


따라서 scalaz의 Either 라이브러리(Either[Exception, Int])  또는 disjunction(Exception \/ Int) 라이브러리를 살펴봐야 한다. 


(나중에 공부해야지)


http://eed3si9n.com/learning-scalaz/Either.html



(참고) http://danielwestheide.com/blog/2013/01/02/the-neophytes-guide-to-scala-part-7-the-either-type.html

import scala.util.control.Exception.catching def handling[Ex <: Throwable, T](exType: Class[Ex])(block: => T): Either[Ex, T] = catching(exType).either(block).asInstanceOf[Either[Ex, T]]



http://appliedscala.com/blog/2016/scalaz-disjunctions


def queryNextNumber: Exception \/ Long = { val source = Math.round(Math.random * 100) if (source <= 60) \/.right(source) else \/.left(new Exception("The generated number is too big!")) }

'scala' 카테고리의 다른 글

[scala] Vector  (0) 2016.10.28
zeppelin 과 spark 연동  (0) 2016.10.28
List에 적용하는 for yield 예시 2  (0) 2016.10.24
[scala] 정수의 산술 연산시 IllegalFormatConversionException 발생  (0) 2016.10.19
[scala] 특이한 Iterator  (0) 2016.10.19
Posted by '김용환'
,


for yield 첫번째 예시 다음 공부이다

http://knight76.tistory.com/entry/scala-for-%EB%AC%B8-yield



list에 for yield를 사용한 예시이다.


간단한 for 코드이다. if 문은 for에 존재할 수도 있고, for 문 블럭 안에 존재할 수 있다.

for {
i <- 1 to 4 //1,2,3,4
if i % 2 == 0
} print(i)
println

결과는 24이다.



그런데, 컬렉션(List)라면 상황이 좀 달라진다. 컬렉션으로 비슷하게 코딩하면 에러가 발생한다.


//  에러 : Error:(37, 3) '<-' expected but '}' found.
// for {
// i <- list
// i % 2 == 0
// } println(i)


다음처럼 바꿔야 잘 동작한다.

val list = List(1, 2, 3, 4)
for (i <- list) {
print(i)
}
println

결과는 다음과 같다.


1234




List에 for yield를 간단하게 사용할 수 있다.

print(for (i <- list) yield i * 2)
println

결과는 다음과 같다.


1234




여기에 if문을 추가한 코드이다. if문을 안에 넣을 수 있지만, for 문장에 포함시킬 수 있다.

for (i <- list) {
if (i % 2 == 0) print(i)
}
println

for (i <- list if i % 2 == 0) {
print(i)
}
println


결과는 동일하게 24이다.





for문은 내부적으로 sugar스럽게 사용할 수 있다. 이를 scala's for comprehension(for 내장, 오현석님이 이미 이렇게 번역하셔서;; )이라고 한다. 스칼라 컴파일러가 for 코드를 내부적으로 flatMap 또는 forEach로 변경됨을 의미하는 개념이라 할 수 있다. 



// scala's comprehension
println(for(x <- List(1) ; y <- List(1,2,3)) yield (x,y))
println(List(1).flatMap(x => List(1,2,3).map(y => (x,y))))



결과는 다음과 같다.


List((1,1), (1,2), (1,3))

List((1,1), (1,2), (1,3))




Posted by '김용환'
,


자바와 동일하게 스칼라의 정수의 산술 연산은 타입을 따라간다. 




아래와 같은 스칼라 코드는 IllegalFormatConversionException이 발생된다. 

"%1.2f".format(uvValue * 100 / 8423179))


포맷은 float를 원하는데, 실제 값은 Integer이기 때문이다.


java.util.IllegalFormatConversionException: f != java.lang.Integer




따라서, 자바에서처럼 형 변환을 해줘야 한다. 


"%1.2f".format(uvValue.toFloat * 100 / 8423179))


아니면, 변수 선언시 미리 타입을 설정하는 방법도 좋다. 



val uvValue:Float = 30123
println("%1.2f".format(uvValue * 100 / 8423179))



Posted by '김용환'
,

[scala] 특이한 Iterator

scala 2016. 10. 19. 11:35


스칼라의 Iterator는 불변이 아닌 가변(mutable)이다.  마치 포인터처럼 내부 인덱스을 가지고 있기 때문에 잘 알아야 할 필요가 있다. 


object Main extends App {
val list = List(1, 2, 3, 4, 5, 6)
val it = list.iterator
println(it.hasNext)
println(it.next)
println(it.next)
println(it.hasNext)
println(it.size)
println(it.hasNext)
println(it.size)
println(it.hasNext)
}

결과는 다음과 같다.


true

1

2

true

4

false

0

false




size를 호출한 이후, 완전히 이상해져버렸다. 



코드로 설명해본다. ^은 리스트의 인덱스를 가르킨다. 

size를 호출하면, 인덱스가 계산 후 끝으로 가버린다. 남은 크기를 리턴하지. 젙체 크기를 리턴하지 않는다. 


val list = List(1, 2, 3, 4, 5, 6)
val it = list.iterator
println(it.hasNext) // ^ 1 2 3 4 5 6
println(it.next) // 1 ^ 2 3 4 5 6
println(it.next) // 1 2 ^ 3 4 5 6
println(it.hasNext)
println(it.size) // 1 2 3 4 5 6 ^
println(it.hasNext) // false
println(it.size) // 0
println(it.hasNext) // false



문서에 보면, next와 hasNext를 제외하고는 Iterator 내부의 값이 바뀔 수 있다고 한다. 




http://www.scala-lang.org/api/current/index.html#scala.collection.Iterator

It is of particular importance to note that, unless stated otherwise, one should never use an iterator after calling a method on it. The two most important exceptions are also the sole abstract methods: next and hasNext.



Posted by '김용환'
,



Stream 객체를 사용할 때 Stream.empty에 대해서 패턴 매칭을 사용할 수 없다. 



다음 예시를 실행해보면, 에러가 발생한다.

val streamRange = Stream.range(0, 1)
streamRange match {
case Stream.empty => println("empty")
case _ => println("ok")
}



Error:(13, 17) stable identifier required, but scala.`package`.Stream.empty found.

    case Stream.empty => println("empty")





Stream.empty는 메소드이기 때문이다. Stream.empy 메소드의 값을 empty로 받고 패턴 매칭하면 제대로 동작한다.



val streamRange = Stream.range(0, 1)

val empty = Stream.empty
streamRange match {
case empty => println("empty")
case _ => println("ok")
}



결과는 다음과 같다.


empty








Posted by '김용환'
,





한 개의 튜플의 List 를 컬렉션 작업으로 생성했다. 

immutable.Map으로 변환하기 위해서 mutable.HashMap에. toMap을 호출하면 Map으로 생성된다.


//val results = collection 작업.. val tempMap = mutable.HashMap[String, Any]()
for (r <- results) {
tempMap.put(r._1, r._2)
}

tempMap.toMap


Posted by '김용환'
,



triple quotes(""")는 스칼라에서 매우 유용한 기능이다. 

왠만한 것은 다 문자열로 만들 수 있고 멀티 라인 문자열로 만들 수 있다. 자바에서 가장 취약했던 부분을 보완한 느낌이다.



\(역슬래시) 없이 "를 마음껏 쓸 수 있고, json도 편하게 사용할 수 있다.


println("""Hello "Water" World """)

결과는 다음과 같다. 


Hello "Water" World 






처음 봤을 때는 어이없지만, "만 출력하게 할 수 있다.


println(""""""")

결과는 다음과 같다.


"



println("""  {"name":"john"} """)

결과는 다음과 같다.

  {"name":"john"} 



간단한 수식도 사용할 수 있다. 스페이스 * 8 칸..

println(s"hello,${" " * 8}world")

결과는 다음과 같다.

hello,        world



변수도 사용할 수 있다. s를 주면 문자열 바깥의 내용을 참조할 수 있게 한다.


val abc = 111
println("""Hello number ${abc} """)
println(s"""Hello number ${abc} """)

결과는 다음과 같다.


Hello number ${abc} 

Hello number 111 




간단한 수식도 사용할 수 있다. 


val trueFalse = true
println(s"""${if (trueFalse) "true" else "false"} """)


결과는 다음과 같다.


true 





if문과 객체 내부도 접근해서 사용할 수 있다.

case class Comment(val comment_type: String)

val comment = Comment("sticker")
println(s"""${if (comment.comment_type.isEmpty) "X" else comment.comment_type} """)

결과는 다음과 같다.


sticker 




triple quote 앞에 f를 사용하면 문맥에 맞게 출력된다.

println(f"""c:\\user""")
println("""c:\\user""")

결과는 다음과 같다.


c:\user

c:\\user




스칼라는 변수명의 제한이 없는데 특수문자로 변수명으로 사용할 수 있고, 이를 triple quotes에서 사용할 수 있다. 

val `"` = "\""
println(s"${`"`}")

val % = "bbb"
//val `%` = "bbb" // 동일함
println(s"${`%`}")




멀티 라인도 지원한다.

val m =
"""Hi
|This is a String!"""
println(m)

결과는 다음과 같다.


Hi

      |This is a String!




공백 이슈가 있다. 이를 제거하려면 다음을 실행한다.


val s =
"""Hi
|This is a String!""".stripMargin
println(s)


결과는 다음과 같다.


Hi

This is a String!




간단하게 문자열 조작을 stripXXX로 실행할 수 있다. 

val s1 =
"""Hi
|This is a String!""".stripPrefix("Hi").stripSuffix("!").stripMargin
println(s1)

결과는 다음과 같다.



This is a String





"""안에 불필요한 \n\r\b 이런 것들을 몽땅 날리기 위해 StringContext.treatEscapes를 포함시켜 실행한다. 

val s2 =
StringContext.treatEscapes(
"""StringContext.
|treatEscapes!\r\n""".stripMargin)
println(s2)

결과는 다음과 같다.


StringContext.

treatEscapes!





간단히 함수로 묶을 수도 있다. 


def hello(name: String) =
s"""
|Hello, $name
|Come to with your family to World
""".stripMargin

println(hello("samuel"))



결과는 다음과 같다.


Hello, samuel

Come to with your family to World


Posted by '김용환'
,




Spark 1.5, 1.6에서는 json4s는 3.2.x만 쓸 수 있다. 

json4s를 작업하다가 다음과 같은 에러를 만났다. 



org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:333)

at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:332)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)

at stat.googleStat2$.run(CommentStat2.scala:29)


Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$

Serialization stack:

- object not serializable (class: org.json4s.DefaultFormats$, value: org.json4s.DefaultFormats$@2b999ee8)

- field (class: stat.CommentStat2$$anonfun$2, name: formats$1, type: class org.json4s.DefaultFormats$)

- object (class stat.CommentStat2$$anonfun$2, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)




아래와 같은 형태로 암시 formats를 재활용해서 쓰려 했는데.. 에러가 발생했다.


 implicit val formats = DefaultFormats

 

 

 class MyJob {

  implicit val formats = DefaultFormats


   RDD.map(x => x.extract[Double])

   .filter(y => y.extract[Int] == 18)

}





해결하려면, 다음처럼 formats를 각 메소드에서 선언해서 써야 한다.


class MyJob {

   RDD.map({x =>

     implicit val formats = DefaultFormats

     x.extract[Double]

    })

   ...

   .filter({ y =>

     implicit val formats = DefaultFormats

     y.extract[Int] == 18

    })


}



황당스럽지만, DefaultFormats의 슈퍼 타입인 Formats가 3.3부터 Serialziable을 상속받았다.

(DefaultFormats은 json4s 3.3부터 serialzable을 지원한다. )


trait Formats extends Serializable


https://github.com/json4s/json4s/commit/961fb27f5e69669fddc6bae77079a999fc6f04a1





하지만, Spark 1.5, 1.6을 쓰고 , json4s 3.2를 쓰는 사람 입장에서는 불편하지만 저렇게 써야 한다. 



Posted by '김용환'
,