Spark에서는 연산할 때 스토리 레벨에 따라 지원하는 api, cache()와 persist()가 존재한다.
RDD에 cache를 저장한 예제를 살펴본다.
scala> val c = sc.parallelize(List("samuel"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> c.getStorageLevel
res0: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
scala> c.cache
res1: c.type = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> c.getStorageLevel
res2: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)
cache()는 기본 저장소 레벨이 MEMORY_ONLY로만으로 사용된다.
이번에는 persist() 예제를 진행한다. persist()는 다음 스토리지 레벨에 맞게 사용할 수 있다.
여기에서 SER은 serialized을 의미한다. disk 저장 위치는 로컬이다.
* 크게 분류된 스토리지 레벨(Storage Level)
Level | Space used | cpu time | In memory | On disk | Serialized |
MEMORY_ONLY | High | Low | Y | N | N |
MEMORY_ONLY_SER | Low | High | Y | N | Y |
MEMORY_AND_DISK | High | Medium | Some | Some | Some |
MEMORY_AND_DISK_SER | Low | High | Some | Some | Y |
DISK_ONLY | Low | High | N | Y | Y |
scala> import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StorageLevel
// 기존 c를 활용.
scala> c.persist(StorageLevel.MEMORY_ONLY_SER)
res4: c.type = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> c.getStorageLevel
res5: org.apache.spark.storage.StorageLevel = StorageLevel(memory, 1 replicas)
scala> val c = sc.parallelize(List("samuel"), 2)
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:25
scala> c.persist(StorageLevel.MEMORY_AND_DISK)
res7: c.type = ParallelCollectionRDD[2] at parallelize at <console>:25
scala> c.getStorageLevel
res8: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)
스토리지 레벨에 대한 공식 문서 내용은 다음과 같다.
https://spark.apache.org/docs/latest/rdd-programming-guide.html
Storage Level | Meaning
--------------------------------
MEMORY_ONLY | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER | Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER | Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY | Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) | Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.
'scala' 카테고리의 다른 글
[spark] [펌질] wide dependecy, narrow dependency (0) | 2017.08.08 |
---|---|
[spark2] partitonBy, HashPartitioner, RangePartitioner 예제 (0) | 2017.08.07 |
[scala] scalatest에서 Exception 처리 (0) | 2017.07.27 |
[scala] scalablitz (0) | 2017.07.27 |
[scala] 병렬 콜렉션 (par collection) (0) | 2017.07.24 |