sbt는 스칼라 컴파일 툴이다.

의존성을 추가할 때 

%%를 사용하면 버전을 artifact 뒤에 추가한다. 스칼라는 아직 하위 호환성보다는 잘 만들어질 언어를 지향하기 때문인데..


그러나 자바 라이브러리를 쓰기 위해 %%를 하면 에러가 발생한다. 그 것은 자바는 버전 별로 라이브러리를 만들지 않기 때문이다. 



sbt.librarymanagement.ResolveException: unresolved dependency: org.apache.phoenix#phoenix-core_2.11;4.11.0-HBase-1.2: not found




scala lib은 %%를

java lib은 %를 사용하는 것에 익숙해질 필요가 있다. ㅠ(맨날 틀림.)

libraryDependencies ++= Seq(guice,
"org.scalatestplus.play" %% "scalatestplus-play" % "3.1.1" % "test",
"org.apache.phoenix" % "phoenix-core" % "4.11.0-HBase-1.2")


Posted by '김용환'
,


spark에서 groupByKey를 사용할 때 성능에 많이 떨어질 수 있다. 


좋은 설명을 포함한 링크가 있다.


https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html



groupByKey를 살펴보면, 키 값으로 분류를 하고 모든 계산을 하나씩 진행한다. 따라서 모든 데이터 복사가 많이 일어날 수 있다. 



반면 reduceByKey에서는 계산을 진행할때 데이터 셔플 전에 노드 내에서 조금 계산해놓는다. 따라서 불필요한 데이터가 전달되지 않기 때문에 네트웍 트래픽, 복사 비용이 groupByKey보다 줄어든다.





마치 map/reduce의 custom combiner와 비슷한 느낌으로 동작한다.




http://www.admin-magazine.com/HPC/Articles/MapReduce-and-Hadoop




Posted by '김용환'
,



RDD map을 사용 하기전에 특정 라인(예, 첫번째 라인)을 사용하고 싶지 않다면 다음과 같은 mapPartitionWithIndex()를 사용한다. 


rdd.mapPartitionsWithIndex(

(i, iterator) => if (i == 0) iterator.drop(1) else iterator)

예제는 다음과 같다.




scala> val rdd = sc.parallelize(List("samuel", "kyle", "jun", "ethan", "crizin"), 5)

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> rdd.mapPartitionsWithIndex((i, iterator) => if (i == 0) iterator.drop(1) else iterator).foreach(println)

kyle

crizin

ethan

jun


scala> rdd.mapPartitionsWithIndex((i, iterator) => if (i % 2 == 0) iterator.drop(1) else iterator).foreach(println)

kyle

ethan



Posted by '김용환'
,

[scala] Product 이해하기

scala 2017. 8. 10. 16:20

case class는 정말 scala/spark 코딩할 때 없으면 안되는 괜찮은 클래스이다.

정체를 알게 되면서 Product를 알게 되 었는 데 정리차 글을 정리한다. 




scala 컴파일러는 타입을 추론하는데..


아래 예제를 보면 Animal이라는 트레이트를 믹싱해서 사용하고 있는데, 

최종 결과를 보면 Product with Serializble with Animal이다. 



scala> trait Animal

defined trait Animal


scala> trait FurryAnimal extends Animal

defined trait FurryAnimal


scala> case class Dog(name:String) extends Animal

defined class Dog


scala> case class Cat(name:String) extends Animal

defined class Cat


scala> val x = Array(Dog("Fido"),Cat("Felix"))

x: Array[Product with Serializable with Animal] = Array(Dog(Fido), Cat(Felix))





만약 Animal 트레이트에 직접 Product를 상속하고 Serialable을 믹스인했다면.. 스칼라 컴파일러는 명확하게 Animal타입으로 인식한다. 


scala> trait Animal extends Product with Serializable

defined trait Animal


scala> case class Dog(name: String) extends Animal

defined class Dog


scala> case class Cat(name: String) extends Animal

defined class Cat


scala> Array(Dog("d"), Cat("c"))

res0: Array[Animal] = Array(Dog(d), Cat(c))




https://stackoverflow.com/a/36526557의 핵심 내용을 정리해본다. 


스칼라의 case class는 다음 특징을 갖고 있다. 


1. Product를 자동으로 상속한다.

2. Serializable을 상속한다

3. 패턴 매치에 쓰이기 위해 hashCode와 equals를 상속한다.

4. 타입 분해를 위해 apply와 unapply 메소드를 지원한다. 



case class는 ADT(Product)의 표현하는 방식이다. 

(그래서 분해 되고 패턴매칭 쓰고 Serializable되니 많이 사용될 수 밖에 없다)



참고로 case object 뿐 아니라 case object도 동일한 Product를 상속받은 스칼라 추론이 발생한다. 



scala> trait Animal

defined trait Animal


scala> case object Dog extends Animal

defined object Dog


scala> case object Cat extends Animal

defined object Cat


scala> val animals = List(Dog, Cat)

animals: List[Product with Serializable with Animal] = List(Dog, Cat)




현상은 case case와 동일하다. 


scala> trait Animal extends Product with Serializable

defined trait Animal


scala>  case object Dog extends Animal

defined object Dog


scala> case object Cat extends Animal

defined object Cat


scala>  val animals = List(Dog, Cat)

animals: List[Animal] = List(Dog, Cat)



Product 내부 코드를 살펴보면 다음과 같다.scala.Equals를 믹스인하고 있다.


package scala
trait Product extends scala.Any with scala.Equals {
def productElement(n : scala.Int) : scala.Any
def productArity : scala.Int
def productIterator : scala.Iterator[scala.Any] = { /* compiled code */ }
def productPrefix : java.lang.String = { /* compiled code */ }
}

scala.Equals는 다음과 같다. Product에는 equals를 갖고 있다. 

package scala
trait Equals extends scala.Any {
def canEqual(that : scala.Any) : scala.Boolean
def equals(that : scala.Any) : scala.Boolean
}


Product의 특징을 살펴본다. productArity, productElement, productPrefix, productIterator 메소드를 확인할 수 있다. 


scala> case class Member(id: Integer, lastname: String, firstName: String)

defined class Member


scala> val samuel = Member(1, "YongHwan", "Kim")

samuel: Member = Member(1,YongHwan,Kim)


scala> samuel.productArity

res0: Int = 3


scala> samuel.productElement(0)

res1: Any = 1


scala> samuel.productElement(1)

res2: Any = YongHwan


scala> samuel.productElement(2)

res3: Any = Kim


scala> samuel.productPrefix

res5: String = Member


scala> samuel.productIterator.foreach(println)

1

YongHwan

Kim




마지막 라인을 기억해두며..List와 Map도 모두 Product를 상속받았음을 알 수 있다. 

scala> val a = List("a", "b")
a: List[String] = List(a, b)

scala> a.productIterator.foreach(println)
a
List(b)

scala> val map = 1 -> "a"
map: (Int, String) = (1,a)

scala> map.productIterator.foreach(println)
1
a


좀 더 고급으로가면.. 스칼라 Reflection 코드도 만나게 된다. 결국 Product는 나름 상위 클래스로 사용되고 있음을 알려주는 코드라 할 수 있다. 참고로(ClassTag와 TypeTag는 Reflection 클래스이다.)

https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala



 protected def withParquetFile[T <: Product: ClassTag: TypeTag]

      (data: Seq[T])

      (f: String => Unit): Unit = {

    withTempPath { file =>

      spark.createDataFrame(data).write.parquet(file.getCanonicalPath)

      f(file.getCanonicalPath)

    }

  }


Posted by '김용환'
,


spark 코딩을 할 때 깊이 생각안하고 대충 짠 것을 후회했다. 그냥 동작만 되길 바라면서 했던 것들이 많이 기억났다. 




spark의 coursera 강의 중 wide dependency와 narrow dependency에 대한 설명이 나오는데, 많은 영감을 주어서 잘 펌질해본다.





https://github.com/rohitvg/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies





Transformations with (usually) Narrow dependencies:

  • map
  • mapValues
  • flatMap
  • filter
  • mapPartitions
  • mapPartitionsWithIndex

Transformations with (usually) Wide dependencies: (might cause a shuffle)

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce










Posted by '김용환'
,


RDD에 partitonBy 메소드를 호출하면서 Partitioner를 정할 수 있다. 

기본 Partitioner(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/Partitioner.html)로는 HashPartitioner, RangePartitioner가 존재한다. 



우선 HashPartitioner를 사용한다. 파티셔닝을 해쉬로 퍼트릴 수 있기 때문에 유용하다. 



먼저 5개의 파티션으로 RDD를 생성했다가 Partitioning을 3개의 HashPartitioner를 사용하는 예제이다. 




scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)), 5)

pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24


scala> pairs.partitioner

res1: Option[org.apache.spark.Partitioner] = None


scala> import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner


scala> val partitioned = pairs.partitionBy(new HashPartitioner(3)).persist()

partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at <console>:27


scala> partitioned.collect

res2: Array[(Int, Int)] = Array((2,2), (1,1), (3,3))


scala> pairs.partitions.length

res7: Int = 5


scala> partitioned.partitions.length

res8: Int = 3


scala> pairs.partitions

res5: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@6ba, org.apache.spark.rdd.ParallelCollectionPartition@6bb, org.apache.spark.rdd.ParallelCollectionPartition@6bc, org.apache.spark.rdd.ParallelCollectionPartition@6bd, org.apache.spark.rdd.ParallelCollectionPartition@6be)


scala> partitioned.partitions

res6: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ShuffledRDDPartition@0, org.apache.spark.rdd.ShuffledRDDPartition@1, org.apache.spark.rdd.ShuffledRDDPartition@2)



persist()는 shuffle을 이미 되도록 해놓기 때문에 성능상 이점을 가진다. 실무에서 사용할 때 유용한 팁이다. 



참고로 RDD.toDebugString() 메소드가 존재하는데 shuffle RDD인지 아닌지를 파악할 때 도움이 된다. 



scala> partitioned.toDebugString

res11: String =

(3) ShuffledRDD[8] at partitionBy at <console>:27 [Memory Deserialized 1x Replicated]

 |       CachedPartitions: 3; MemorySize: 192.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B

 +-(5) ParallelCollectionRDD[7] at parallelize at <console>:24 [Memory Deserialized 1x Replicated]


scala> pairs.toDebugString

res13: String = (5) ParallelCollectionRDD[7] at parallelize at <console>:24 []






다음은 RangePartitioner 예제이다. 내용은 비슷해보인다.



scala> import org.apache.spark.RangePartitioner

import org.apache.spark.RangePartitioner


scala> new RangePartitioner(3, pairs)

res9: org.apache.spark.RangePartitioner[Int,Int] = org.apache.spark.RangePartitioner@7d2d


scala> val rangePartitioned = pairs.partitionBy(new RangePartitioner(3, pairs)).persist()

rangePartitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[8] at partitionBy at <console>:28


scala> rangePartitioned.collect

res10: Array[(Int, Int)] = Array((1,1), (2,2), (3,3))


scala> rangePartitioned.partitions.length

res11: Int = 3



RangePartitioner API(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/RangePartitioner.html)를 살펴보면, ordering와 정렬순서(오름차순/내림차순)으로 할 수 있는 형태가 있다. HashPartitioner와 크게 다른 내용이라 할 수 있을 듯 싶다. 

소스 : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala

public RangePartitioner(int partitions,
                RDD<? extends scala.Product2<K,V>> rdd,
                boolean ascending,
                scala.math.Ordering<K> evidence$1,
                scala.reflect.ClassTag<K> evidence$2)





'scala' 카테고리의 다른 글

[scala] Product 이해하기  (0) 2017.08.10
[spark] [펌질] wide dependecy, narrow dependency  (0) 2017.08.08
[spark2] cache()와 persist()의 차이  (0) 2017.08.01
[scala] scalatest에서 Exception 처리  (0) 2017.07.27
[scala] scalablitz  (0) 2017.07.27
Posted by '김용환'
,

Spark에서는 연산할 때 스토리 레벨에 따라 지원하는 api, cache()와 persist()가 존재한다. 



RDD에 cache를 저장한 예제를 살펴본다.

scala> val c = sc.parallelize(List("samuel"), 2)

c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> c.getStorageLevel

res0: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)


scala> c.cache

res1: c.type = ParallelCollectionRDD[0] at parallelize at <console>:24


scala> c.getStorageLevel

res2: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)




cache()는 기본 저장소 레벨이 MEMORY_ONLY로만으로 사용된다. 





이번에는 persist() 예제를 진행한다. persist()는 다음 스토리지 레벨에 맞게 사용할 수 있다. 

여기에서 SER은 serialized을 의미한다. disk 저장 위치는 로컬이다. 


* 크게 분류된 스토리지 레벨(Storage Level)


Level

Space used

cpu time 

In memory 

 On disk

Serialized 

MEMORY_ONLY 

High

Low 

 N

MEMORY_ONLY_SER

Low 

High 

 N Y

MEMORY_AND_DISK

High

Medium 

Some 

 Some Some

MEMORY_AND_DISK_SER 

Low 

High 

Some 

 Some Y

DISK_ONLY 

Low 

High 


 Y Y



scala> import org.apache.spark.storage.StorageLevel;

import org.apache.spark.storage.StorageLevel


// 기존 c를 활용.


scala> c.persist(StorageLevel.MEMORY_ONLY_SER)

res4: c.type = ParallelCollectionRDD[1] at parallelize at <console>:24


scala> c.getStorageLevel

res5: org.apache.spark.storage.StorageLevel = StorageLevel(memory, 1 replicas)





scala> val c = sc.parallelize(List("samuel"), 2)

c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:25


scala> c.persist(StorageLevel.MEMORY_AND_DISK)

res7: c.type = ParallelCollectionRDD[2] at parallelize at <console>:25


scala> c.getStorageLevel

res8: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)



참고로 rdd에 persist를 사용하고 다시 persist를 사용하면 에러가 발생한다.

scala> c.persist(StorageLevel.MEMORY_AND_DISK)
java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
  at org.apache.spark.rdd.RDD.persist(RDD.scala:169)
  at org.apache.spark.rdd.RDD.persist(RDD.scala:194)
  ... 48 elided




스토리지 레벨에 대한 공식 문서 내용은 다음과 같다. 

https://spark.apache.org/docs/latest/rdd-programming-guide.html


Storage Level         | Meaning

--------------------------------

MEMORY_ONLY | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

MEMORY_AND_DISK  |  Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

MEMORY_ONLY_SER  |  Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

MEMORY_AND_DISK_SER | Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

DISK_ONLY         | Store the RDD partitions only on disk.

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.  | Same as the levels above, but replicate each partition on two cluster nodes.

OFF_HEAP (experimental)  | Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.





원본 데이터 저장과 serialized의 성능 차이는 어느 개발자가 써놓은 내용이 있다..

출처 : http://sujee.net/wp-content/uploads/2015/01/spark-caching-1.png





또한, persist를 잘못 사용하면 애플리케이션이 종료해도 메모리를 계속 사용하는 문제가 발생할 수 있다고 한다.

출처 : http://tomining.tistory.com/84




따라서 애플리케이션 내에서 persist()호출이 된 rdd에 unpersist()를 호출해야 한다.!

scala> c.unpersist()
res10: c.type = ParallelCollectionRDD[2] at parallelize at <console>:25



Posted by '김용환'
,


scalatest에서 Exception처리하는 예제이다. 


다음과 같은 포맷으로 개발한다. 


intercept[Exception] {

   메소드

}


import org.scalatest.FunSuite
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class AppSuite extends FunSuite {

test("test") {
checkParam("a")
}

test("null test") {
intercept[IllegalArgumentException] {
checkParam(null)
}
}

def checkParam(param: String): Int = param match {
case null => throw new IllegalArgumentException("None is illegal.")
case _ => 0
}

}


Posted by '김용환'
,

[scala] scalablitz

scala 2017. 7. 27. 19:49

coursera의 scala 강의 중에 scalablitz의 흔적(monoid 설명)이 있어서 함 찾아봤다.



scala 2.9(2011년)부터 parallel 패키지가 추가되었다. 


그러나 3rd party로 scalablitz(http://scala-blitz.github.io/)로 있긴 했지만, 2014년 쯔음부터는 더 이상 운영되지 못했다. 이제는 역사속으로 사진 라이브러리이지만... 


Parallel Collections were originally introduced into Scala in release 2.9. Why another data-parallel collections framework? While they provided programmers with seamless data-parallelism and an easy way to parallelize their computations, they had several downsides. First, the generic library-based approach in Scala Parallel Collections had some abstraction overheads that made them unsuitable for certain types of computations involving number crunching or linear algebra. To make efficient use of parallelism, overheads like boxing or use of iterators have to be eliminated. Second, pure task-based preemptive scheduling used in Scala Parallel Collections does not handle certain kinds of irregular data-parallel operations well. The data-parallel operations in this framework are provided for a wide range of collections, and they greatly reduce both of these overheads.




libraryDependencies += "com.github.scala-blitz" %% "scala-blitz" % "1.1"



스칼라의 병렬 콜렉션은 scala.collection.par 패키지를 이용할 수 있다. 스칼라 병렬 콜렉션처럼 일반 콜렉션에서 toPar 메소드를 호출하면 병렬 객체를 리턴한다. 


import scala.collection.par._
import scala.collection.par.Scheduler.Implicits.global

def mean(a: Array[Int]): Int = {
val sum = a.toPar.reduce(_ + _)
sum / a.length
}

val m = mean(Array(1, 3, 5))
print(m)

결과 값은 3이다.



이후에 예제 코딩을 진행하면 기존 스칼라 코드와 충돌이 나면서 테스트를 계속하기 애매해진다.


Error:(25, 5) reference to text is ambiguous;

it is both defined in method totalLength and imported subsequently by 

import scala._




slideshare에서 scalablitz 맛을 보는데 도움이 되는 것 같다. generic 관련해서 깔끔해진 느낌이 있긴 하다.. 

(가뜩이나 스칼라는 공부할수록 복잡해지는 느낌이 있긴 하다.......)


ScalaBlitz from Aleksandar Prokopec



더 궁금하면 아래 링크를 참조한다.


http://apprize.info/programming/scala/7.html


Posted by '김용환'
,


스칼라의 콜렉션은 Immutable, mutable 말고

병렬 콜렉션(Par...)와 순차 콜렉션(일반적으로 사용하는 콜렉션)과 상위 클래스(Gen...)으로 나눠진다. 

참고로 Gen은 general의 약자이다. 



출처: http://docs.scala-lang.org/overviews/parallel-collections/architecture.html




아래의 예를 보면 기본 콜렉션인 Array에 par 함수를 호출하면서 병렬 콜렉션인 ParArray를 인스턴스로 얻는다.

val parArray = (1 to 100).toArray.par

parArray: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)





http://docs.scala-lang.org/overviews/parallel-collections/overview.html 에 병렬 콜렉션에 대한 예제가 있는데 심플해서 도움이 된다.



map 예제

val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames.map(_.toUpperCase)

lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)

res7: scala.collection.parallel.immutable.ParSeq[String] = ParVector(SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN)




fold 예제

val parArray = (1 to 100).toArray.par
parArray.fold(0)(_ + _)

parArray: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

res5: Int = 5050




filter 예제

val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames.filter(_.head >= 'f')

lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)

res7: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Jackson, Rodin)





성능을 높이는 splitter와 combiner(builder)에 대한 정보는 coursera 강의를 참조할 수 있다.
https://github.com/rohitvg/scala-parallel-programming-3/wiki/Splitters-and-Combiners





아래 자료를 보면 병렬 콜렉션의 장점(속도)를 알 수 있고 병렬 콜렉션의 이해도, custom splitter/builder 내용이 담겨 있다.


Scala Parallel Collections from Aleksandar Prokopec






Posted by '김용환'
,