hadoop의 위치를 /a/b/c라고 하고 DataFrame에서 partitionBy를 다음처럼 사용한다면,


.write.mode(SaveMode.Append).partitionBy("command", "subcommand").orc(hourPath)


hadoop의 디렉토리 구조가 된다. 


/a/b/c/command=add/action=abc








Posted by '김용환'
,



kafka에서 스트림을 받아 spark stream으로 처리할 때, hdfs에 append를 할 수 없다. 


rdd에 있는 saveAsTextFile밖에 없다. 

rdd.saveAsTextFile(savePath)


대안으로 FileUtils.copyMerge가 있지만, stream 처리할 때는 쓸 수 없다.

org.apache.hadoop.fs.FileUtil 클래스

static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString)



아래처럼 사용할 수 있긴 한데..
def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
val hdfs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)
if (!HdfsUtil.exists(dstPath)) HdfsUtil.mkdir(dstPath)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, ssc.sparkContext.hadoopConfiguration, null)
}

단점은 hadoop 3.0에서 사라졌다!!!


https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/release/3.0.0-alpha1/RELEASENOTES.3.0.0-alpha1.html

https://issues.apache.org/jira/browse/HADOOP-12967



Removed FileUtil.copyMerge.



해결 할 수 있는 방법으로 rdd를 Dataframe으로 바꾼 후 저장할 때 orc로 append할 수 있다.


https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html#copyMerge(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path, boolean, org.apache.hadoop.conf.Configuration, java.lang.String)

stream
.flatMap(line => CustomLog.parse(line))
.repartition(1).foreachRDD { rdd =>
val df = rdd.map { log =>
(log.host, log.stdate, log.sttime)
}.toDF("host", "stdate", "sttime").coalesce(1)
.write.mode(SaveMode.Append).orc(hourPath)
}







Posted by '김용환'
,




 missing EOF at '...' near ''hdfs://..' 에러는 Location 밑에 tblproperties이 없어서 에러가 발생할 수 있다.



 LOCATION 

 'hdfs://hadoop/goolgleplus/log'

 tblproperties ("orc.compress"="NONE")





external 테이블에 location과 stored as orc라는 프로퍼티를 함꼐 추가하면 에러가 발생할 수 있다.

Posted by '김용환'
,




spark context에 집합 오퍼레이션이 있다. 


scala> val rdd1 = sc.parallelize(List("Spark","Scala"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24


scala> val rdd2 = sc.parallelize(List("Akka","Scala"))

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[51] at parallelize at <console>:24




합집합


scala> rdd1.union(rdd2).collect()

res11: Array[String] = Array(Spark, Scala, Akka, Scala)



교집합.


scala> rdd1.intersection(rdd2).collect()

res12: Array[String] = Array(Scala)



카테시안


scala> rdd1.cartesian(rdd2).collect()

res13: Array[(String, String)] = Array((Spark,Akka), (Spark,Scala), (Scala,Akka), (Scala,Scala))



차집합(A-B)


scala> rdd1.subtract(rdd2).collect()

res14: Array[String] = Array(Spark)





join 함수는(K, V)와 (K, W)를 호출해  (K, (V, W))인 새로운 RDD를 생성한다.



scala> val hash1 = sc.parallelize( Seq(("1", "A"), ("2", "B"), ("3", "C"), ("1","D")))

hash1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[64] at parallelize at <console>:24


scala> val hash2 = sc.parallelize( Seq(("1", "W"), ("2", "X"), ("3", "Y"), ("2","Z")))

hash2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[65] at parallelize at <console>:24




scala> hash1.join(hash2).collect()

res15: Array[(String, (String, String))] = Array((1,(A,W)), (1,(D,W)), (2,(B,X)), (2,(B,Z)), (3,(C,Y)))




cogroup 함수는 (K, V)를 (K, Iterable<V>)로 변환한다.



scala> hash1.cogroup(hash2).collect()

res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(A, D),CompactBuffer(W))), (2,(CompactBuffer(B),CompactBuffer(X, Z))), (3,(CompactBuffer(C),CompactBuffer(Y))))

Posted by '김용환'
,


Intellij에서 컴파일할 때 org.jboss.interceptor#jboss-interceptor-api;1.1 not found 예외가 발생할 수 있다.


not found: org.jboss.interceptor#jboss-interceptor-api;1.1


원인은 Maven central repository의 SHA-1 정도가 달라서 나는 문제이어서 JBoss쪽 저장소를 추가하면 Intellij 컴파일이 될 것이다 



sbt가 0.13이라면 ~/.sbt/0.13/global.sbt 에 다음 내용을 추가한다


resolvers += "JBoss" at "https://repository.jboss.org/"




sbt가 0.13.10이라면 ~/.sbt/0.13.10/global.sbt 에 다음 내용을 추가한다


resolvers += "JBoss" at "https://repository.jboss.org/"

Posted by '김용환'
,




spark에서 특정 단어의 개수를 찾는 예시이다. 이 예시는 pv, uv를 뽑는데 도움이 되는 코드이다. 



예제


 https://kodejava.org/how-do-i-format-a-date-into-ddmmyyyy/를 참조했다.

$ cat  xxx.txt

     Date date = Calendar.getInstance().getTime();


        // Display a date in day, month, year format

        DateFormat formatter = new SimpleDateFormat("dd/MM/yyyy");

        String today = formatter.format(date);

        System.out.println("Today : " + today);


        // Display date with day name in a short format

        formatter = new SimpleDateFormat("EEE, dd/MM/yyyy");

        today = formatter.format(date);

        System.out.println("Today : " + today);


        // Display date with a short day and month name

        formatter = new SimpleDateFormat("EEE, dd MMM yyyy");

        today = formatter.format(date);

        System.out.println("Today : " + today);


        // Formatting date with full day and month name and show time up to

        // milliseconds with AM/PM

        formatter = new SimpleDateFormat("EEEE, dd MMMM yyyy, hh:mm:ss.SSS a");

        today = formatter.format(date);

        System.out.println("Today : " + today);




scala> val codes = sc.textFile("xxx.txt")

codes: org.apache.spark.rdd.RDD[String] = xxx.txt MapPartitionsRDD[1] at textFile at <console>:24


scala> val lower = codes.map( line =>line.toLowerCase)

lower: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26


scala> val words = lower.flatMap(line => line.split("\\s+"))

words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:28


scala> val counts = words.map(word => (word, 1))

counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:30


scala> val frequency = counts.reduceByKey(_ + _)

frequency: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:32


scala> val invFrequency = frequency.map(_.swap)

invFrequency: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[6] at map at <console>:34


scala> invFrequency.top(10).foreach(println)

(23,)

(9,=)

(6,date)

(5,//)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)



이를 다음처럼 축약해서 쓸 수 있다. 



scala> val result = sc.textFile("xxx.txt").map( line =>line.toLowerCase).flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap)

result: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[21] at map at <console>:24


scala> result.top(10).foreach(println)

(23,)

(9,=)

(6,date)

(5,//)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)





필요없는 코드는 다음처럼 stopWords를 만들어 필터링할 수 있다. 


scala> val stopWords = Set("", "=", "//", ")", "(", ";", ":", "+", "-", "\"")

stopWords: scala.collection.immutable.Set[String] = Set("", =, ), ", -, ;, //, +, (, :)


scala> val result = sc.textFile("xxx.txt").map( line =>line.toLowerCase).flatMap(line => line.split("\\s+")).filter(! stopWords.contains(_)).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap)

result: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[34] at map at <console>:26


scala> result.top(10).foreach(println)

(6,date)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)

(4,formatter)

(3,name)

(3,display)



Posted by '김용환'
,


RDD를 공부하다가 종종 다음 에러가 발생한다.


java.util.NoSuchElementException: None.get 



웹에서 확인해보니. 이미 Spark Jira에 올라와 있다..


https://issues.apache.org/jira/browse/SPARK-16599





17/03/14 20:33:06 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 33)

java.util.NoSuchElementException: None.get

at scala.None$.get(Option.scala:347)

at scala.None$.get(Option.scala:345)

at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)

at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 4.0 failed 1 times, most recent failure: Lost task 7.0 in stage 4.0 (TID 39, localhost, executor driver): java.util.NoSuchElementException: None.get

at scala.None$.get(Option.scala:347)

at scala.None$.get(Option.scala:345)

at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)

at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


Driver stacktrace:

  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)

  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)

  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)

  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)

  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)

  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)

  at scala.Option.foreach(Option.scala:257)

  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)

  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)

  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)

  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)

  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)

  at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)

  at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)

  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

  at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)

  ... 52 elided

Caused by: java.util.NoSuchElementException: None.get

  at scala.None$.get(Option.scala:347)

  at scala.None$.get(Option.scala:345)

  at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)

  at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)

  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)

  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

  at java.lang.Thread.run(Thread.java:745)




2017년 3월 15일에 cloudera의 sean owen이 pull request를 올렸다.. 잘 되면 좋을 듯 싶다.


https://github.com/apache/spark/pull/17290



소스 패치 내용

https://github.com/apache/spark/commit/27234e154db18cbc614053446713636a69046090


https://github.com/apache/spark/commit/5da4bcffa1b39ea8c83fe63a09e68297be371784


조만간에 패치되어 잘 동작할 듯하다..




Posted by '김용환'
,

다음과 같은 에러가 발생한다는 것은.. 실행시 SparkContext가 두 개 이상의 인스턴스가 있다는 의미이다. 



Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:





spark-shell을 실행할 때, 이미 SparkContext가 이미 생성되어 있다. 


scala> sc

res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@26a004ed


이 때 새로운 SparkContext를 생성할 때 해당 예외가 발생한다. 



scala> val conf = new SparkConf().setAppName("aaa")

conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@64d42d3d


scala> val ssc = new StreamingContext(conf, Seconds(1))

org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)



StreamingContext를 기존의 sc로 바인딩하면 에러가 발생하지 않는다.


scala> val ssc = new StreamingContext(sc, Seconds(1))

ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@55ff64fd




만약 두 개의 SparkContext를 유지하려면, SparkConf에 다음 설정을 추가한다.


conf.set("spark.driver.allowMultipleContexts","true");



Posted by '김용환'
,


spark streaming submit하다가 다음과 같은 에러가 발생할 때가 있었다. 


The main method in the given main class must be static



https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L723



  val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

   if (!Modifier.isStatic(mainMethod.getModifiers)) {

      throw new IllegalStateException("The main method in the given main class must be static")

   }



분명 spark은 scala여서 static이 없는데, 왜 이게 발생한 걸까?


내가 만든 코드에는 문제가 없어 보였다. 


http://docs.scala-lang.org/ko/tutorials/scala-for-java-programmers.html



  1. object HelloWorld {
  2. def main(args: Array[String]) {
  3. println("Hello, world!")
  4. }
  5. }

똑똑한 독자들은 이미 눈치챘겠지만 위의 예제에서 main 함수는 static이 아니다. Scala에는 정적 멤버(함수든 필드든)라는 개념이 아얘 존재하지 않는다. 클래스의 일부로 정적 멤버를 정의하는 대신에 Scala 프로그래머들은 정적이기 원하는 멤버들을 싱글턴 객체안에 선언한다.




아....


결국은 class로 정의한 클래스를 spark submit하다가 발생한 문제였다. 스칼라에서는 static 이라는 reserved word가 없지만, object는 내부적으로 생성한다.그래서 scala 코드에서 확인한 것이었다. 



class 를 object로 변경하니 제대로 동작한다. 


Posted by '김용환'
,


padTo 예제이다. 컬렉션의 길이만큼 없는 데이터는 디폴트 값으로 채운다. 


List("a", "b", "c").padTo(5, "-")

res0: List[String] = List(a, b, c, -, -)




리스트의 특정 데이터의 개수를 꼭 채워야 하는 경우가 있다. 로그 저장시 데이터가 없다고 해서 그냥 두기 보다 디폴트 값같은 개념을 둔다고 생각한다.


padTo를 적용한 Log case 클래스 예제이다. 




scala> case class Log(tokens: List[String]) {

     | val z1 = tokens(0)

     | val z2 = tokens(1)

     | val z3 = tokens(2)

     | val z4 = tokens(3)

     | val z5 = tokens(4)

     | }

defined class Log



scala> val l = new Log(List("a", "b", "c").padTo(5, "-"))

l: Log = Log(List(a, b, c, -, -))


scala> l.z1

res4: String = a


scala> l.z2

res5: String = b


scala> l.z3

res6: String = c


scala> l.z4

res7: String = -


scala> l.z5

res8: String = -



Posted by '김용환'
,