kafka에서 스트림을 받아 spark stream으로 처리할 때, hdfs에 append를 할 수 없다.
rdd에 있는 saveAsTextFile밖에 없다.
rdd.saveAsTextFile(savePath)
대안으로 FileUtils.copyMerge가 있지만, stream 처리할 때는 쓸 수 없다.
org.apache.hadoop.fs.FileUtil 클래스
static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString)
아래처럼 사용할 수 있긴 한데..
def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
val hdfs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)
if (!HdfsUtil.exists(dstPath)) HdfsUtil.mkdir(dstPath)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, ssc.sparkContext.hadoopConfiguration, null)
}
단점은 hadoop 3.0에서 사라졌다!!!
https://issues.apache.org/jira/browse/HADOOP-12967
- HADOOP-12967 | Major | Remove FileUtil#copyMerge
Removed FileUtil.copyMerge.
해결 할 수 있는 방법으로 rdd를 Dataframe으로 바꾼 후 저장할 때 orc로 append할 수 있다.
stream
.flatMap(line => CustomLog.parse(line))
.repartition(1).foreachRDD { rdd =>
val df = rdd.map { log =>
(log.host, log.stdate, log.sttime)
}.toDF("host", "stdate", "sttime").coalesce(1)
.write.mode(SaveMode.Append).orc(hourPath)
}
'scala' 카테고리의 다른 글
[spark] dataframe 예제 (0) | 2017.03.15 |
---|---|
[spark] dataframe의 partitionby 사용시 hadoop 디렉토리 구조 (0) | 2017.03.15 |
[spark] 집합 함수 - union, intersection, cartesian, subtract, join, cogroup 예제 (0) | 2017.03.15 |
[spark] sbt 빌드시 - not found: org.jboss.interceptor#jboss-interceptor-api;1.1 에러 해결 (0) | 2017.03.15 |
[spark] RDD 테스트 - word count 예제 (0) | 2017.03.15 |