Intellij에서 play2를 import했지만, scala object를 추가하기 어렵고 play스럽지 않아서 개발이 불편할 수 있다.


만약 아래와 같은 문구까지 나오면 다음 팁을 따른다.


Info: SBT compilation for play framework 2.x disabled by default





1) Intellij 설정 변경


Settings -> Langauges & Frameworks -> Play2 -> Compiler -> User Play 2 compiler for this project.



Info: SBT compilation for play framework 2.x disabled by default이 나오면 Intellij를 재시작한다.



2) 프로젝트 설정 변경

프로젝트에서 마우스 오른쪽 클릭 -> Add Framework Support -> Play2  추가.






Posted by '김용환'
,





scala의 collection에서 [error]  required: scala.collection.GenTraversableOnce[?] 에러가 나는 경우가 있다..




예)


scala> List(1, "x").flatten

<console>:12: error: No implicit view available from Any => scala.collection.GenTraversableOnce[B].

       List(1, "x").flatten




scala> List(1, "x").flatMap(a => a)

<console>:12: error: type mismatch;

 found   : Any

 required: scala.collection.GenTraversableOnce[?]

       List(1, "x").flatMap(a => a)

                                 ^



실제 api를 보면  다음과 같이 A => GenTraversableOnce라는 타입을 받는다. 


def flatten[B](implicit asTraversable: A => /*<:<!!!*/ GenTraversableOnce[B]): CC[B] = {


final override def flatMap[B, That](f: A => GenTraversableOnce[B])
                       (implicit bf: CanBuildFrom[List[A], B, That]): That = {



재미있는 것은 Option은 GenTraversableOnce으로 implicit으로 변환할 수 있다. 




이전에 에러를 수정하려면 다음처럼 수정하면 될 것이다.



scala> List(Some(1), Some("x"), None).flatten

res9: List[Any] = List(1, x)



scala> List(1, "x").flatMap(a => Some(a))

res8: List[Any] = List(1, x)



Posted by '김용환'
,


스칼라의 리스트(또는 배열)에서 특정 index의 값을 얻을 수 있지만 없는 Index에 접근하면 에러가 발생한다. 


scala> List(8, 9, 10)(0)

res5: Int = 8


scala> List(8, 9, 10)(3)

java.lang.IndexOutOfBoundsException: 3

  at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)

  at scala.collection.immutable.List.apply(List.scala:84)

  ... 48 elided




리스트(또는 배열)에서는 IndexOutOfBoundsException이 발생하지 않도록 Option을 리턴하는 lift를 제공한다. 


scala> List(8, 9, 10).lift

res0: Int => Option[Int] = <function1>


scala> List(8, 9, 10).lift(1)

res1: Option[Int] = Some(9)


scala> List(8, 9, 10).lift(2)

res2: Option[Int] = Some(10)


scala> List(8, 9, 10).lift(3)

res3: Option[Int] = None


scala> List(8, 9, 10).lift(4)

res4: Option[Int] = None






만약 None이라도 getOrElse를 통해 값을 얻을 수 있다.


scala> List(8, 9, 10).lift(10).getOrElse(0).toInt

res10: Int = 0


scala> List(8, 9, 10).lift(1).getOrElse(0).toInt

res11: Int = 9





리스트(또는 배열)의 lift를 따로 올라가면 다음과 같다. PartialFunction이다. 


trait PartialFunction[-A, +B] extends (A => B) {
...

/** Turns this partial function into a plain function returning an `Option` result.
* @see Function.unlift
* @return a function that takes an argument `x` to `Some(this(x))` if `this`
* is defined for `x`, and to `None` otherwise.
*/
def lift: A => Option[B] = new Lifted(this)


PartialFunction[A, B]라 가정하면 PartialFunction에서 A타입을 받고 B 타입으로 올리는(lift)한다는 의미가 있다.



scala> val test: PartialFunction[Int, Int] = { case i if i == 0 => 0 ; case _ => -1}

test: PartialFunction[Int,Int] = <function1>


scala> test.lift(0)

res15: Option[Int] = Some(0)


scala> test.lift(11)

res16: Option[Int] = Some(-1)



Posted by '김용환'
,


https://databricks.com/blog/2016/05/11/apache-spark-2-0-technical-preview-easier-faster-and-smarter.html


좋은 spark 2.0 소개 자료가 있다.



성능이 월등히 좋아졌다.







primitiveSpark 1.6Spark 2.0
filter15ns1.1ns
sum w/o group14ns0.9ns
sum w/ group79ns10.7ns
hash join115ns4.0ns
sort (8-bit entropy)620ns5.3ns
sort (64-bit entropy)620ns40ns
sort-merge join750ns700ns






API가 좋아졌다.

DataFrame이 쓰기 편해졌고, HiveContext(SQLContext) 대신 SparkSession이 새로 추가되었다. 




  • Unifying DataFrames and Datasets in Scala/Java: Starting in Spark 2.0, DataFrame is just a type alias for Dataset of Row. Both the typed methods (e.g. mapfiltergroupByKey) and the untyped methods (e.g. selectgroupBy) are available on the Dataset class. Also, this new combined Dataset interface is the abstraction used for Structured Streaming. Since compile-time type-safety in Python and R is not a language feature, the concept of Dataset does not apply to these languages’ APIs. Instead, DataFrame remains the primary programing abstraction, which is analogous to the single-node data frame notion in these languages. Get a peek from a Dataset API notebook.
  • SparkSession: a new entry point that replaces the old SQLContext and HiveContext. For users of the DataFrame API, a common source of confusion for Spark is which “context” to use. Now you can use SparkSession, which subsumes both, as a single entry point, as demonstrated in this notebook. Note that the old SQLContext and HiveContext are still kept for backward compatibility.
  • Simpler, more performant Accumulator API: We have designed a new Accumulator API that has a simpler type hierarchy and support specialization for primitive types. The old Accumulator API has been deprecated but retained for backward compatibility
  • DataFrame-based Machine Learning API emerges as the primary ML API: With Spark 2.0, the spark.ml package, with its “pipeline” APIs, will emerge as the primary machine learning API. While the original spark.mllib package is preserved, future development will focus on the DataFrame-based API.
  • Machine learning pipeline persistence: Users can now save and load machine learning pipelines and models across all programming languages supported by Spark.
  • Distributed algorithms in R: Added support for Generalized Linear Models (GLM), Naive Bayes, Survival Regression, and K-Means in R.


Posted by '김용환'
,


Spark에서 Executor 또는 Driver의 메모리가 너무 적으면 OutOfMemoryError가 발생할 수 있다. 


WARN TaskSetManager: Lost task 68.0 in stage 9.0 (TID 510, story-hadoop-dn11.dakao.io): java.lang.OutOfMemoryError: GC overhead limit exceeded

	at java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:967)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1785)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
	at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
	at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:122)
	at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:127)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)



Spark는 모두 메모리에 올리기 때문에. 이런 현상이 자주 발생될 수 있는데, 


해결하기 위해서는 Hdfs 파일의 크기(압축이라면 이에 맞게 *8 또는 *10해서 대략 예측 크기를 고려한다)


core 개수와 driver, executer 메모리도 이에 맞게 늘리면 더 이상 문제가 발생하지 않는다. 

'scala' 카테고리의 다른 글

[scala] List와 Array의 lift 메소드  (0) 2017.03.04
[펌] spark 2.0 소개(성능)  (0) 2017.03.02
[spark] spark summit 자료  (0) 2017.02.22
[scala] Array.transpose 예시  (0) 2017.02.17
[spark1.6] rdd를 dataframe으로 만드는 방법  (0) 2017.02.14
Posted by '김용환'
,

[spark] spark summit 자료

scala 2017. 2. 22. 23:38


spark summit 자료는 잘 공개되어 있다. 세션 뿐 아니라 튜터리얼 까지도. ㄷ ㄷ 




https://spark-summit.org/2016/schedule/



https://spark-summit.org/east-2017/schedule/




'scala' 카테고리의 다른 글

[펌] spark 2.0 소개(성능)  (0) 2017.03.02
[spark] spark의 OutOfMemoryError 발생시  (0) 2017.02.24
[scala] Array.transpose 예시  (0) 2017.02.17
[spark1.6] rdd를 dataframe으로 만드는 방법  (0) 2017.02.14
[spark] foreachPartition 예시  (0) 2017.02.14
Posted by '김용환'
,



스칼라의 Array의 transpose는 행렬 연산과 관련된 메소드이다. 


M X N의 행렬을 N X M의 행렬로 변경한다.



<코드 #1>


val x = Array(Array(1,2), Array(3,4))

val z = x.transpose


<결과 #1>

x: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4))
z: Array[Array[Int]] = Array(Array(1, 3), Array(2, 4))




<코드 #2>

val x = Array(Array(1,2), Array(3,4), Array(5,6))

val z = x.transpose


<결과 #2>

x: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(5, 6))
z: Array[Array[Int]] = Array(Array(1, 3, 5), Array(2, 4, 6))



<코드 #3>


val a = Map (

   10 -> List("20x", "10y", "10z"),

   29 -> List("29a", "29b", "29c")

)


val b = a.map { case (k, v) =>

  v.map(k -> _)

}


val c = b.toList


val d = c.transpose


val e = d.map(_.toMap)


<결과 #4>

a: scala.collection.immutable.Map[Int,List[String]] = Map(10 -> List(20x, 10y, 10z), 29 -> List(29a, 29b, 29c))
b: scala.collection.immutable.Iterable[List[(Int, String)]] = List(List((10,20x), (10,10y), (10,10z)), List((29,29a), (29,29b), (29,29c)))
c: List[List[(Int, String)]] = List(List((10,20x), (10,10y), (10,10z)), List((29,29a), (29,29b), (29,29c)))
d: List[List[(Int, String)]] = List(List((10,20x), (29,29a)), List((10,10y), (29,29b)), List((10,10z), (29,29c)))
e: List[scala.collection.immutable.Map[Int,String]] = List(Map(10 -> 20x, 29 -> 29a), Map(10 -> 10y, 29 -> 29b), Map(10 -> 10z, 29 -> 29c))



<코드 #5>

val test = """ 

a 1

b 2

"""


val a = test.split('\n').filter(!_.trim.isEmpty)

val b = a.map(_.split("[ ]+"))

val c = b.transpose

val d = c.map(_.mkString("x"))


<결과 #5>


a: Array[String] = Array(a 1, b 2)
b: Array[Array[String]] = Array(Array(a, 1), Array(b, 2))
c: Array[Array[String]] = Array(Array(a, b), Array(1, 2))
d: Array[String] = Array(axb, 1x2)




Posted by '김용환'
,



rdd를 dataframe으로 만드는 방법 (1.6)


1) SQLContext를 사용하는 방법


val sqlContext = new SQLContext(sc) 

import sqlContext.implicits._

rdd.toDF()





2) HiveContext를 이용해 DataFrame.createDataframe 이용


import scala.io.Source
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext 
import org.apache.spark.sql.hive.HiveContext
   
val peopleRDD = sc.textFile(filename)

val schemaString = "name age"

val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

val sqlContext = new HiveContext(sc)

val peopleDF = sqlContext.createDataFrame(rowRDD, schema)
peopleDF.registerTempTable("people")

val results = sqlContext.sql("SELECT name FROM people")
results.collect().foreach(println)




'scala' 카테고리의 다른 글

[spark] spark summit 자료  (0) 2017.02.22
[scala] Array.transpose 예시  (0) 2017.02.17
[spark] foreachPartition 예시  (0) 2017.02.14
[zepplin] 여러 spark context 사용하기  (0) 2017.02.14
scala에서 uuid 생성하는 방법  (0) 2017.02.09
Posted by '김용환'
,




foreach 와 foreachPartition의 차이


foreach는 간단히 collection을 리스트로 출력한다.


val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

list.foreach(x => println(x))




foreachPartition는 RDD를 partition처리한다. 

partition 개수는 foreachPartition 전에 미리 지정되야 한다. 하지만 리턴 값이 필요없을 때 사용된다.


val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)

b.foreachPartition(x => println(x.reduce(_ + _))) 





리턴 값이 필요할 때는 mapPartition 또는 mapPartitionsWithIndex을 사용한다. 


val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)

b.foreachPartition(x => println(x.reduce(_ + _)))

val mapped = b.mapPartitionsWithIndex {

    (index, iterator) => {

        println("aaaa -> " + index)

        val myList = iterator.toList

        myList.map(x => x + " -> " + index).iterator

    }

}


mapped.collect()


res41: Array[String] = Array(1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0, 6 -> 1, 7 -> 1, 8 -> 1, 9 -> 1, 10 -> 1)



데이터 프레임도 사용할 수 있다.


       dataframes.mapPartitions(_ grouped 10).foreach { batch =>

       val accountWithSep =

        batch.map {

        case Row(accountId: Int) => accountId.toString()

      }.mkString(",")

    




Posted by '김용환'
,

zepplin에서 spark을 테스트하다가 아래와 같은 에러를 만날 수 있다. 

여러 spark context가 쓰인 이유인데, allowMultipleContext를 true로 설정하면 문제가 없다. 



org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:


문제 해결


  val sparkConf = new SparkConf()

    .setAppName("abc")

    .set("spark.driver.allowMultipleContexts", "true");







Posted by '김용환'
,