kafka에서 스트림을 받아 spark stream으로 처리할 때, hdfs에 append를 할 수 없다. 


rdd에 있는 saveAsTextFile밖에 없다. 

rdd.saveAsTextFile(savePath)


대안으로 FileUtils.copyMerge가 있지만, stream 처리할 때는 쓸 수 없다.

org.apache.hadoop.fs.FileUtil 클래스

static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString)



아래처럼 사용할 수 있긴 한데..
def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
val hdfs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)
if (!HdfsUtil.exists(dstPath)) HdfsUtil.mkdir(dstPath)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, ssc.sparkContext.hadoopConfiguration, null)
}

단점은 hadoop 3.0에서 사라졌다!!!


https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/release/3.0.0-alpha1/RELEASENOTES.3.0.0-alpha1.html

https://issues.apache.org/jira/browse/HADOOP-12967



Removed FileUtil.copyMerge.



해결 할 수 있는 방법으로 rdd를 Dataframe으로 바꾼 후 저장할 때 orc로 append할 수 있다.


https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html#copyMerge(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path, boolean, org.apache.hadoop.conf.Configuration, java.lang.String)

stream
.flatMap(line => CustomLog.parse(line))
.repartition(1).foreachRDD { rdd =>
val df = rdd.map { log =>
(log.host, log.stdate, log.sttime)
}.toDF("host", "stdate", "sttime").coalesce(1)
.write.mode(SaveMode.Append).orc(hourPath)
}







Posted by '김용환'
,