HadoopRDD는 하둡 1.x 라이브러리의 맵리듀스 API를 사용해 HDFS에 저장된 데이터를 읽는 핵심 기능을 제공한다. 

그러나 일반 파일 시스템에서 읽을 때도 HadoopRDD가 사용된다. 


HadoopRDD는 기본적으로 사용되며 모든 파일 시스템의 데이터를 RDD로 로드할 때 HadoopRDD를 살펴볼 수 있다.




scala> val statesPopulationRDD = sc.textFile("test.csv")

statesPopulationRDD: org.apache.spark.rdd.RDD[String] = test.csv MapPartitionsRDD[27] at textFile at <console>:24


scala>  statesPopulationRDD.toDebugString

res22: String =

(2) test.csv MapPartitionsRDD[27] at textFile at <console>:24 []

 |  test.csv HadoopRDD[26] at textFile at <console>:24 []







NewHadoopRDD는 Hadoop 2.x 라이브러리의 새로운 맵리듀스 API를 사용해 HDFS, HBase 테이블, 아마존 S3에 저장된 데이터를 읽는 핵심 기능을 제공한다. NewHadoopRDD는 다양한 포맷으로 읽을 수 있기 때문에 여러 외부 시스템과 상호 작용하기 위해 사용된다.


스파크 컨텍스트의 wholeTextFiles 함수를 사용하여 WholeTextFileRDD를 생성하는 것이다. 실제로 WholeTextFileRDD는 다음 코드처럼 NewHadoopRDD를 상속한다.

scala> val rdd_whole = sc.wholeTextFiles("test.txt")

rdd_whole: org.apache.spark.rdd.RDD[(String, String)] = wiki1.txt MapPartitionsRDD[29] at wholeTextFiles at <console>:24


scala> rdd_whole.toDebugString

res23: String =

(1) test.txt MapPartitionsRDD[29] at wholeTextFiles at <console>:24 []

 |  WholeTextFileRDD[28] at wholeTextFiles at <console>:24 []




직접적으로 스파크 컨텍스트의 newAPIHadoopFile을 사용할 수 있다. 




import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat


import org.apache.hadoop.io.Text


val newHadoopRDD = sc.newAPIHadoopFile("test.txt", classOf[KeyValueTextInputFormat], classOf[Text],classOf[Text])






Posted by 김용환 '김용환'

댓글을 달아 주세요