Spark의 Task는 하나의 partition을 가진다.
SparkContext의 parallelize를 실행해서 hadoop HDFS에 데이터를 저장할 때, 병렬(spark core) 개수만큼 파티션이 생긴다. 전문 용어로 level of parallelism이라 한다 (hadoop에서도 reduce 개수만큼 파티션 개수가 생긴다)
HDFS에 저장할 용량이 크지 않다면 spark core 개수와 상관없이 하나의 파티션 파일로 모아두는 것이 좋을 수 있다.(전문용어로 aggregation 이라 한다)
이를 위해 repartition 또는 coalesce를 사용할 수 있다.
(coalesce는 random shuffle을 하지 않고, repartition은 random shuffle을 하기 때문에 성능 상 차이점이 있다)
repartition을 적용하니 잘 작동한다. hadoop의 HDFS에 하나의 파일로 모아졌다.
sc.parallelize(birthdayUsers.map(x => s"${person.id}\t${person.name}")).repartition(1)
* 참고 : 문서에 따르면 aggregation하는 방법은 3가지로 있다.
http://spark.apache.org/docs/latest/programming-guide.html
* 소스 비교
repartition은 coalesce에서 shuffle만 한 것이다.
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
coalesce 소스이다.
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
} else {
new CoalescedRDD(this, numPartitions)
}
}
소스를 살펴본 것은
repartition은 네트워크 부하는 올라가지만 무난하게 사용할 수 있는데, coalesce는 동작은 하는데, 100kb보다 크다는 warning이 발생하였지만 잘 동작했기 때문이다.
WARN TaskSetManager: Stage 1 contains a task of very large size (XXXX). The maximum recommended task size is 100 KB.
문서를 보면, numPartitions=1일 때 계산 비용이 높아질 수 있다고 하며 shuffle을 true로 설정하라 한다.
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
'scala' 카테고리의 다른 글
[zepplin] 여러 spark context 사용하기 (0) | 2017.02.14 |
---|---|
scala에서 uuid 생성하는 방법 (0) | 2017.02.09 |
[scala] List concatenation 리스트 결합 예시 (0) | 2017.01.11 |
[scala] zip/unzip 예시 (0) | 2017.01.10 |
[scala] () => A라는 형식의 Thunk 예시 (0) | 2017.01.04 |