'2017/08/01'에 해당되는 글 1건

  1. 2017.08.01 [spark2] cache()와 persist()의 차이

Spark에서는 연산할 때 스토리 레벨에 따라 지원하는 api, cache()와 persist()가 존재한다. 



RDD에 cache를 저장한 예제를 살펴본다.

scala> val c = sc.parallelize(List("samuel"), 2)

c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> c.getStorageLevel

res0: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)


scala> c.cache

res1: c.type = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> c.getStorageLevel

res2: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)




cache()는 기본 저장소 레벨이 MEMORY_ONLY로만으로 사용된다. 





이번에는 persist() 예제를 진행한다. persist()는 다음 스토리지 레벨에 맞게 사용할 수 있다. 

여기에서 SER은 serialized을 의미한다. disk 저장 위치는 로컬이다. 


* 크게 분류된 스토리지 레벨(Storage Level)


Level

Space used

cpu time 

In memory 

 On disk

Serialized 

MEMORY_ONLY 

High

Low 

 N

MEMORY_ONLY_SER

Low 

High 

 N Y

MEMORY_AND_DISK

High

Medium 

Some 

 Some Some

MEMORY_AND_DISK_SER 

Low 

High 

Some 

 Some Y

DISK_ONLY 

Low 

High 


 Y Y



scala> import org.apache.spark.storage.StorageLevel;

import org.apache.spark.storage.StorageLevel


// 기존 c를 활용.


scala> c.persist(StorageLevel.MEMORY_ONLY_SER)

res4: c.type = ParallelCollectionRDD[1] at parallelize at <console>:24


scala> c.getStorageLevel

res5: org.apache.spark.storage.StorageLevel = StorageLevel(memory, 1 replicas)





scala> val c = sc.parallelize(List("samuel"), 2)

c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:25


scala> c.persist(StorageLevel.MEMORY_AND_DISK)

res7: c.type = ParallelCollectionRDD[2] at parallelize at <console>:25


scala> c.getStorageLevel

res8: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)



참고로 rdd에 persist를 사용하고 다시 persist를 사용하면 에러가 발생한다.

scala> c.persist(StorageLevel.MEMORY_AND_DISK)
java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
  at org.apache.spark.rdd.RDD.persist(RDD.scala:169)
  at org.apache.spark.rdd.RDD.persist(RDD.scala:194)
  ... 48 elided




스토리지 레벨에 대한 공식 문서 내용은 다음과 같다. 

https://spark.apache.org/docs/latest/rdd-programming-guide.html


Storage Level         | Meaning

--------------------------------

MEMORY_ONLY | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

MEMORY_AND_DISK  |  Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

MEMORY_ONLY_SER  |  Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

MEMORY_AND_DISK_SER | Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

DISK_ONLY         | Store the RDD partitions only on disk.

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.  | Same as the levels above, but replicate each partition on two cluster nodes.

OFF_HEAP (experimental)  | Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.





원본 데이터 저장과 serialized의 성능 차이는 어느 개발자가 써놓은 내용이 있다..

출처 : http://sujee.net/wp-content/uploads/2015/01/spark-caching-1.png





또한, persist를 잘못 사용하면 애플리케이션이 종료해도 메모리를 계속 사용하는 문제가 발생할 수 있다고 한다.

출처 : http://tomining.tistory.com/84




따라서 애플리케이션 내에서 persist()호출이 된 rdd에 unpersist()를 호출해야 한다.!

scala> c.unpersist()
res10: c.type = ParallelCollectionRDD[2] at parallelize at <console>:25



Posted by '김용환'
,