scala의 어떤 클래스가 클래스를 상속받고 트레이트(trait)를 믹스인하는 초기화는 다음 순서와 같다.


object Main extends App {
trait T1 {
println("t1") // 4
}

trait T2 {
println("t2") // 5
}

class BaseParent {
println("base-parent") // 2
}

class Parent extends BaseParent {
println("parent") // 3
}

class Example extends Parent with T1 with T2 {
println("example") // 6
}

println(s"before") // 1
new Example
println(s"after") // 7
}


결과는 다음과 같다. 


부모 클래스의 부모 클래스부터 초기화되고 다음에 부모 클래스가 생성된다. 

다음에 믹스인(mix in)한 트레이트가 초기화된다. 


before

base-parent

parent

t1

t2

example

after





'scala' 카테고리의 다른 글

[scala] 쉘 실행하기  (0) 2016.12.13
[scala] 클래스 초기화하기(생성)  (0) 2016.12.13
[scala] for 내장  (0) 2016.12.08
[scala] try-catch/Try-match/Either/Validation  (0) 2016.12.06
[scala] 부분 적용 함수 / 커링 / 부분 함수  (0) 2016.12.05
Posted by '김용환'
,

[scala] for 내장

scala 2016. 12. 8. 11:44

for 내장(comprehension) (또는 for yield문)에 대한 예시이다. 


for 내장은 다중 for 문으로 생각하는 게 좋다. 


그리고, yield를 map/flatmap/foreach와 비슷한 느낌으로 보는게 좋은 듯 하다. 따라서 for 내장의 라인은 큰 의미가 있기 때문에 주의해서 쓸 필요가 있다. 





즉, 아래와 같은 문장은 쓸 수 없고, 컴파일 에러가 난다. 

val m = for {
a = 5
b = 3
} yield (a + b)

에러 내용이다.


Error:(12, 7) '<-' expected but '=' found.

    a = 5

Error:(13, 1) illegal start of simple expression

          b = 3




아래 코드도 에러가 발생하지만, for 내장에 대해서 조금 이해할 수 있다. 

val m = for {
a <- 1
b <- 2
} yield (a + b)


에러 내용이다.


Error:(12, 10) value flatMap is not a member of Int

    a <- 1

Error:(13, 10) value map is not a member of Int

    b <- 2




다음과 같은 에러가 발생하는데. 첫 번째 라인에서는 flatmap, 다음은 map관련 에러가 발생한다. 

즉, for 내장은 내부적으로 변환된다는 의미를 가진다. 

마지막 문장은 map이고, 이전 문장은 flatmap으로 변환된다.





동작되는 for 내장 예시를 소개한다. 

for {
l <- List(1,2,3,4,5)
} yield l.println


결과는 다음과 같다.


1

2

3

4

5





val m = for {
s <- List("a", "bb", "ccc")
} yield s.length
println(m)

결과는 다음과 같다. 


List(1, 2, 3)



그리고 2차 for문처럼 쓸 수 있다. 


for {
i <- 1 to 2
j <- 3 to 4
} println(s"$i + $j = ${i + j}")


결과는 다음과 같다.


1 + 3 = 4

1 + 4 = 5

2 + 3 = 5

2 + 4 = 6





object Main extends App {
val m = for {
s <- List("1", "a", "3")
c <- s
} yield s"${s} - ${c}"
println(m)
}

결과는 다음과 같다.


List(1 - 1, a - a, 3 - 3)


val m = for {
s <- List("1", "a", "3")
c <- s
if c.isLetter
} yield s"${s} - ${c}"
println(m)

결과는 다음과 같다.


List(a - a)




yield에 콜렉션(튜플)을 사용할 수 있다. 

val m = for {
s <- List("b4", "a", "3")
c <- s
} yield (s, c)

m.foreach(println)


결과는 다음과 같다.

(b4,b)

(b4,4)

(a,a)

(3,3)



option으로 테스트 해본다.

val m = for {
i <- List(1, 2, 3, 0)
} yield (i * 10)

m.foreach(println)

결과는 다음과 같다.


10

20

30

0



Some으로 내용을 감싸면 내부적으로 Some  None에 따라 값을 구한다. 

val m = for {
Some(i) <- List(Some(1), Some(2), Some(3), None)
} yield (i * 10)

m.foreach(println)



결과는 다음과 같다.


10

20

30




참고로 인스턴스는 사용한 콜렉션을 따르다. 아래 결과는 Vector(10, 20, 30)이다. 


val m = for {
Some(i) <- Vector(Some(1), Some(2), Some(3), None)
} yield (i * 10)

println(m)






함수를 사용할 때는 리턴 타입이 Option로 감쌀 때 잘 동작한다. 그냥 하면 에러가 발생한다. 



다음은 에러 케이스이다. 

def isAllDigits(s: String) : Int = {
if (s forall Character.isDigit) {
return s.toInt
}
0
}

val m = for {
a <- isAllDigits("1")
b <- isAllDigits("2")
c <- isAllDigits("9")
} yield (a + b + c)

println(m)

결과는 다음과 같다. 


Error:(33, 21) value flatMap is not a member of Int

    a <- isAllDigits("1")

Error:(34, 21) value flatMap is not a member of Int

    b <- isAllDigits("2")

Error:(35, 21) value map is not a member of Int

    c <- isAllDigits("9")




다음은 Option을 래핑한 예시이다.


def isAllDigits(s: String) : Option[Int] = {
if (s forall Character.isDigit) {
return Some(s.toInt)
}
Some(0)
}

val m = for {
a <- isAllDigits("1")
b <- isAllDigits("2")
c <- isAllDigits("9")
} yield (a + b + c)

println(m)


결과는 다음과 같다. 


Some(12)



잘못된 결과가 나와도 잘 동작한다. 

def isAllDigits(s: String) : Option[Int] = {
if (s forall Character.isDigit) {
return Some(s.toInt)
}
Some(0)
}

val m = for {
a <- isAllDigits("1")
b <- isAllDigits("a")
c <- isAllDigits("2")
} yield (a + b + c)

println(m)



Posted by '김용환'
,


scala에서 Exception처리에 대한 다양한 내용을 살펴본다.




1. try-catch 

자바와 비슷하게 처리한다.


object Main extends App {
def throwException: Unit = {
try {
println("A".toLong)
} catch {
case ex: Exception => println(ex)
} finally {
println("finally")
}
}
throwException
}


결과는 다음과 같다.


java.lang.NumberFormatException: For input string: "A"

finally




2. scala.util.Try-match



import scala.util.{Failure, Success, Try}

object Main extends App {
def throwException: Any = {
Try {
println("A".toLong)
} match {
case Success(result) => result
case Failure(exception) => exception
}
}
println(throwException)
}

결과는 다음과 같다.


java.lang.NumberFormatException: For input string: "A"



다른 예제


val loginUser =
Try(requestHeader.headers("loginUser")) match {
case Success(user) => user
case _ => ""
}




3. scala.util.Either 


Either는 Exception과 결과 값을 한번에 받을 수 있는 타입이다. 


def throwException: Either[Long, Throwable] = {
try {
Left("A".toLong)
} catch {
case ex: Exception => Right(ex)
} finally {
println("finally")
}
}
println(throwException)
println(throwException.merge)


결과는 다음과 같다.


finally

Right(java.lang.NumberFormatException: For input string: "A")

finally

java.lang.NumberFormatException: For input string: "A"



merge메소드는 예외와 리턴값 사이의 값을 하나로 병합한다. 





4. Exception.allCatch


1,2,3에 연관되어 Exception.allCatch라는 것이 있다. 이를 이용하면 try/catch를 한 줄에 사용할 수도 있다.

import scala.util.control.Exception._
println(allCatch.opt("A".toLong))
println(allCatch.toTry("A".toLong))
println(allCatch.either("A".toLong))

결과는 다음과 같다.


None

Failure(java.lang.NumberFormatException: For input string: "A")

Left(java.lang.NumberFormatException: For input string: "A")




5. scalaz의 disjunction

Either보다 더 좋은 형태의 예외 처리를 할 수 있다. 

scala 고수 개발자들이 scalaz의 disjunction을 추천한다고 하니 알아두면 좋을 것 같다.


\/를 사용하면서 해괴하지만, 간단해서 쓴다는...



val m = scalaz.\/.right[Throwable, Int](5).map(_ * 2)
println(m)
println(m.merge)

val n = scalaz.\/.left[Throwable, String](new Exception("1")).map(_.toLong)
println(n)
println(n.merge)

val ez : String \/ String = "a".right
println(ez)
println(ez.merge)


결과는 다음과 같다. Either와 비슷하게 쓰인다. 훨씬 either보다는 편하게 쓸 수 있다. 


\/-(10)

10


-\/(java.lang.Exception: 1)

java.lang.Exception: 1


\/-(a)

a



자세한 내용은 scalaz 7.2(https://github.com/scalaz/scalaz/tree/series/7.2.x)를 참고한다.



5. Validation


scalaz의 Validation도 사용할 수 있다. 


import scalaz._

def positive(i: Int): Validation[Exception, Int] = {
if (i > 0) Success(i) // <1>
else Failure(new NumberFormatException("should number > 0"))
}
println(positive(1))
println(positive(-1))


Posted by '김용환'
,


부분 적용 함수(partially applied function)에 대한 예시이다. 마치 함수처럼 사용할 수 있다. 


def test(s1: String)(s2: String) = s1 + s2
val a = test("s1")
println(a)

이렇게 하면 에러가 발생한다.


Error:(10, 15) missing argument list for method test in object Main

Unapplied methods are only converted to functions when a function type is expected.

You can make this conversion explicit by writing `test _` or `test(_)(_)` instead of `test`.

  val a = test("s1")




에러가 발생하지 않도록 하려면 다음처럼 _를 추가해야 한다.

def test(s1: String)(s2: String) = s1 + s2
val a = test("s1") _
println(a)


실제 테스트 결과를 출력하면 다음과 같다.

def test(s1: String)(s2: String) = s1 + s2
val a = test("s1") _
println(a("s2"))


만약 매개변수가 3개라면 _(언더바)를 2개를 써야할까? 하나면 쓰면 된다. _는 나머지를 가르키는 대명사 역할을 한다.

def test(s1: String)(s2: String)(s3: String) = s1 + s2 + s3
val a = test("s1") _
println(a)


만약 다음과 같은 형태를 사용할 수 있다.

def test(s1: String) = (s2: String) => s1 + s2
val a = test("s1")
println(a)



언커링하려면 다음과 같다.

def test(s1: String, s2: String) = s1 + s2
val curried = (test _).curried
println(curried("s1")("s2"))
println(test("s1", "s2"))


결과는 다음과 같다.


s1s2

s1s2




curried함수는 Function2 트레이트의 함수로서 정의되어 있는 함수이다. 

/** Creates a curried version of this function.
*
* @return a function `f` such that `f(x1)(x2) == apply(x1, x2)`
*/
@annotation.unspecialized def curried: T1 => T2 => R = {
(x1: T1) => (x2: T2) => apply(x1, x2)


만약 커링함수를 언커링할 수 있다. 

val uncurried = Function.uncurried(curried)
println(uncurried("s1", "s2"))



함수와 콜렉션를 받는 함수라면 다음처럼 사용할 수 있다. 

object MyCombinator {
def foreach[A, U](f: A => U)(list: List[A]): Unit = list foreach f
}
val printX = (s: String) => println(s)
val f = MyCombinator.foreach(printX) _
f(List("1", "2"))

결과는 다음과 같다.


1

2








다음은 부분 함수이다. PartialFunction을 사용한다. 부분 적용 함수와 완전히 다른 형태이다. 입력 값이 일정 범위에 있는지를 정의할 수 있는 함수 정도(수학적인 부분을 의미하는지 알려주는 함수)가 될 것이다. 따라서 case 문으로 많이 사용된다. 

val one: PartialFunction[Int, String] = { case 1 => "one" }
println(one.isDefinedAt(1))
println(one.isDefinedAt(2))
println(one(1))


결과는 다음과 같다.


true

false





PartialFunction의 또 다른 예시이다. 

  val div: PartialFunction[(Double, Double), Double] = {
case (x, y) if y != 0 => x /y
}
println(div.isDefinedAt(1, 1))
println(div.isDefinedAt(2, 1))
println(div(1, 1))
}

결과는 다음과 같다.


true

true

1.0




PartialFunction는 case문 아니면 에러가 발생한다.  다음과 같은 코드에 컴파일 에러가 발생한다.

val one: PartialFunction[Int, String] = { "" }


Error:(9, 45) type mismatch;

 found   : String("")

 required: PartialFunction[Int,String]

  val one: PartialFunction[Int, String] = { "" }




항상 case 문이 있어야 하지만, 다음 코드는 컴파일 에러가 발생하지 않는다. 정의대로 구현했기 때문이다.

val one = new PartialFunction[Int, String] {
def apply(d: Int) = ""
def isDefinedAt(d: Int) = d != 0
}




PartialFunction 트레이트와 PartionFuction 오브젝트가 존재한다. 

trait PartialFunction[-A, +B] extends (A => B) { self =>
import PartialFunction._ ..

object PartialFunction {






참고로 PartialFunction은 예외를 발생하지 않는다.




    val f: PartialFunction[String, String] = { case "ping" => "pong"}


    val g: PartialFunction[List[Int], String] = {

      case Nil =>"one"

      case x :: rest =>

        rest match {

          case Nil => "two"

        }

    }




패턴매칭에 맞으면 true/false를 리턴한다. 


 println(Lists.f.isDefinedAt("ping"))

 println(Lists.g.isDefinedAt(List(1,2,3)))




'scala' 카테고리의 다른 글

[scala] for 내장  (0) 2016.12.08
[scala] try-catch/Try-match/Either/Validation  (0) 2016.12.06
[scala] 꼬리 재귀(tail recursion)와 @tailrec  (0) 2016.12.05
[scala] Future말고 Promise  (0) 2016.11.27
[scala] Future 2  (0) 2016.11.23
Posted by '김용환'
,

스칼라에서의 꼬리 재귀와 tailrec를 공부한다.




머리 재귀 예시이다. 일면 stack over flow가 발생할 수 있다. 


def sum1(list: List[Int]): Int = list match {
case Nil => 0
case t :: tail => t + sum1(tail)
}
println(sum1((1 to 2).toList))
//println(sum1((1 to 1000000).toList)) //Exception in thread "main" java.lang.StackOverflowError

def sum2(list: List[Int]): Int = {
if (list.isEmpty) 0
else list.head + sum2(list.tail)
}
println(sum2((1 to 2).toList))
//println(sum2((1 to 1000000).toList)) //Exception in thread "main" java.lang.StackOverflowError

결과는 3이다.





중간값을 가진 꼬리 재귀로 구현해 보자. 



def sum3(list: List[Int], acc: Int): Int = {
if (list.isEmpty) acc
else sum3(list.tail, list.head + acc)
}
println(sum3(((1 to 2).toList), 0))
println(sum3((1 to 1000000).toList, 0))

def sum4(list: List[Int], acc: Int): Int = list match {
case Nil => acc
case h :: tail => sum4(tail, h + acc)
}
println(sum4(((1 to 2).toList), 0))
println(sum4((1 to 1000000).toList, 0))

결과는 다음과 같다. 중간 값을 저장했기 때문에 stack over flow가 발생하지 않았다. 


3

1784293664

3

1784293664






중간값을 꼬리 재귀에 조금만 수정해서 앞의 entry 함수를 하나 만들어본다.

def tailrecSum(l: List[Int]): Int = {
def sum5(list: List[Int], acc: Int): Int = list match {
case Nil => acc
case x :: tail => sum5(tail, acc + x)
}
sum5(l, 0)
}

println(tailrecSum((1 to 1000000).toList))


결과는 다음과 같다. 


1784293664








스칼라에는 꼬리 재귀 최적화 기능을 가지고 있다. 


@tailrec라고 재귀 함수 앞에 붙이면 스칼라 컴파일러에 꼬리 재귀가 있으니 최적화라고 알려준다.


@tailrec를 사용하려면 다음 import문을 사용한다.

import scala.annotation.tailrec




앞에 실행했던 예시는 아래와 같이 sum5앞에 @tailrec를 붙였다.

def tailrecSum(l: List[Int]): Int = { @tailrec
def sum5(list: List[Int], acc: Int): Int = list match {
case Nil => acc
case x :: tail => sum5(tail, acc + x)
}
sum5(l, 0)
}

println(tailrecSum((1 to 1000000).toList))



@tailrec는 아무 때나 최적화되지 않고, 심지어 에러가 발생할 수 있으니. 신중히 써야 할 수 있다.



아래 코드는 Recursive call not in position 이라는 컴파일 에러가 발생한다.

@tailrec
def factorial(i: BigInt): BigInt = {
if (i == 1) i
else i * factorial(i - 1)
}

for (i <- 1 to 10)
println(s"$i:\t${factorial(i)}")


재귀 함수가 public이면, 상속받아서 쓸 수 있기 때문에 쓰지 못하도록 에러를 발생시킨다.


class Printer(msg: String) {
@tailrec
def printMessageNTimes(n: Int): Unit = {
if(n > 0){
println(msg)
printMessageNTimes(n - 1)
}
}
}

new Printer("m").printMessageNTimes(10000)

could not optimize @tailrec annotated method printMessageNTimes: it is neither private nor final so can be overridden



final 메소드로 수정하니. 정상적으로 동작한다.

class Printer(msg: String) {
@tailrec
final def printMessageNTimes(n: Int): Unit = {
if(n > 0){
println(msg)
printMessageNTimes(n - 1)
}
}
}

new Printer("m").printMessageNTimes(10000)




트램폴린(trampoline)은 여러 함수가 다른 함수를 호출하여 이루어지는 재귀를 말한다. 


X를 호출하면 A를 호출했다가 A의 내부에서 B를 호출했고 B의 내부에서 B를 호출하면서. 계속 왔다 갔다하는 형태의 재귀를 말한다. 



스칼라에는 TailCall(https://www.scala-lang.org/api/current/scala/util/control/TailCalls$.html) 이라를 객체가 있으니, 이를 참조한다.


import scala.util.control.TailCalls._

def isEven(xs: List[Int]): TailRec[Boolean] =
  if (xs.isEmpty) done(true) else tailcall(isOdd(xs.tail))

def isOdd(xs: List[Int]): TailRec[Boolean] =
 if (xs.isEmpty) done(false) else tailcall(isEven(xs.tail))

isEven((1 to 100000).toList).result

def fib(n: Int): TailRec[Int] =
  if (n < 2) done(n) else for {
    x <- tailcall(fib(n - 1))
    y <- tailcall(fib(n - 2))
  } yield (x + y)

fib(40).result


'scala' 카테고리의 다른 글

[scala] try-catch/Try-match/Either/Validation  (0) 2016.12.06
[scala] 부분 적용 함수 / 커링 / 부분 함수  (0) 2016.12.05
[scala] Future말고 Promise  (0) 2016.11.27
[scala] Future 2  (0) 2016.11.23
[scala] Future 1  (0) 2016.11.22
Posted by '김용환'
,

[scala] Future말고 Promise

scala 2016. 11. 27. 22:30



scala에서는 Future보다는 Promise를 많이 사용한다.


Promise를 사용해 Future를 생성하고 완료할 수 있다. Promise는 Future에 포함된 값을 명시적으로 설정할 수 있는 핸들이다.



import scala.concurrent.Promise
val p = Promise[Int]
println(p.future.value)
println(p.success(42))
println(p.future.value)

결과는 다음과 같다. 


None

Success(42)

Some(Success(42))


sucess나 fail을 호출해야 future 값을 얻는 구조이다. 





그리고, Promise에서 Future를 가지고 사용할 수도 있다. 

import scala.concurrent.Promise
val promise = Promise[String]()
val future = promise.future

println(future.value)
println(promise.success("hello").isCompleted)
println(future.value.get)


결과는 다음과 같다. Promise가 계산되어야 future 값을 얻을 수 있다. 


None

true

Success(hello)





Promise는 case class에서도 사용할 수 있다. 


case class Person(number: Int)
val person = Promise[Person]
person.success(Person(10))
println(person.future.value)






참고로 play2 프레임워크에는 비동기 Promise를 사용하는 예제가 있다. 


https://playframework.com/documentation/2.5.x/ScalaAsync



import play.api.libs.concurrent.Execution.Implicits.defaultContext


val futurePIValue: Future[Double] = computePIAsynchronously()

val futureResult: Future[Result] = futurePIValue.map { pi =>

  Ok("PI value computed: " + pi)

}







'scala' 카테고리의 다른 글

[scala] 부분 적용 함수 / 커링 / 부분 함수  (0) 2016.12.05
[scala] 꼬리 재귀(tail recursion)와 @tailrec  (0) 2016.12.05
[scala] Future 2  (0) 2016.11.23
[scala] Future 1  (0) 2016.11.22
[zeppelin] spark의 변수 공유하기  (0) 2016.11.18
Posted by '김용환'
,

[scala] Future 2

scala 2016. 11. 23. 11:49


Future에 있어서 약점은 타임아웃도 Exception이라는 점이다. 


타임아웃이 발생했을 때 Exception이 발생한다. 

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

println("Start")
val f = Future {
println("1")
Thread.sleep(2000) // 실제 코드가 들어갈 장소
println("2")
}

println("End")
val result = Await.result(f, 1 second)


결과는 예외이다. 

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [1 second]

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

at scala.concurrent.BlockContext$DefaultBlockContext$

....




일반 Exception과 타임아웃 Exception을 구분하기 위해 클래스를 감쌀 수 있다. 


https://github.com/PacktPublishing/Scala-High-Performance-Programming/blob/master/chapter6/src/main/scala/highperfscala/concurrency/future/SafeAwait.scala



result 함수는 Future는 Awaitable을 상속받은 클래스와 시간을 받고 Future를 감싼다. TimeoutException시는 None으로 리턴한하게 한다. 

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

object SafeAwait {
def result[T](awaitable: Awaitable[T],
atMost: Duration): Option[T] = Try(Await.result(awaitable, atMost)) match {
case Success(t) => Some(t)
case Failure(_: TimeoutException) => None
case Failure(e) => throw e
}
}

val f1 = Future {
println("11")
5 + 10
}

println(SafeAwait.result(f1, 1 second))

val f2 = Future {
println("11")
Thread.sleep(5000)
5 + 10
}

println(SafeAwait.result(f2, 1 second))


결과는 다음과 같다. 


11

Some(15)

11

None




Future의 변환에서 Exception이 발생할 수 있는데, 이에 대한 Future에서 어떻게 대응할 수 있는 살펴본다.



문자를 숫자로 변환했으니. map 단계에서 변환할 것이다. 

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).map(i => {
println("Multiplying")
i * 2
})

Await.result(f, 1 second)


<결과>


Exception in thread "main" java.lang.NumberFormatException: For input string: "not-an-integer"

at java.lang.NumberFormatException.forInputString






Future에는 recover라는 메소드가 있다. 이를 활용하면, 다운 스트림 에러를 보정할 수 있다. 



def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete { v => p complete (v recover pf) }
p.future
}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recover {
case _: NumberFormatException => 2
}.map(i => {
println("Multiplying")
i * 2
})
println(Await.result(f, 1 second))

결과는 다음과 같다. 


Multiplying

4





recoverWith라는 메소드도 있다. 부분 함수를 사용할 수 있으며, recover와 동일하게 처리할 수 있다. 


def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])
         (implicit executor: ExecutionContext): Future[U] = {




import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recoverWith {
case _: NumberFormatException => Future(2)
}.map(i => {
println("Multiplying")
i * 2
})
println(Await.result(f, 1 second))

결과는 다음과 같다. 


Multiplying

4




Await.result 대신 onComplete를 사용할 수 있다. 



import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recover {
case _: NumberFormatException => 2
}.map(i => {
println("Multiplying")
i * 2
}).onComplete {
case Success(i) => println(s"$i")
case Failure(e) => println(s"${e.getMessage}")
}

Thread.sleep(1000)




Future는 이미 계산된 값으로 만들 수 있다. 그리고 특정 값을 래핑한 Future로 만들 수 있다. (Future를 리턴할 때)


예)

val stringFuture: Future[String] = Future.successful("yes")



Future.successful이나 Future.failed를 이용할 수 도 있고, 더 추상화된 클래스인 Promise를 사용할 수 있다. 아래 예시 결과에서 본 것처럼 모두 계산된 (isCompleted=true) 상태이다. 

import scala.concurrent.{Future}
import scala.concurrent.Promise

val f1 = Future.successful(3)
println(f1.isCompleted)
println(f1)

val f2 = Future.failed(new IllegalArgumentException("no!"))
println(f2.isCompleted)
println(f2)


val promise = Promise[String]()
val future = promise.future
println(promise.success("hello").isCompleted)


결과


true

scala.concurrent.impl.Promise$KeptPromise@146ba0ac

true

scala.concurrent.impl.Promise$KeptPromise@4dfa3a9d

true




'scala' 카테고리의 다른 글

[scala] 꼬리 재귀(tail recursion)와 @tailrec  (0) 2016.12.05
[scala] Future말고 Promise  (0) 2016.11.27
[scala] Future 1  (0) 2016.11.22
[zeppelin] spark의 변수 공유하기  (0) 2016.11.18
spark의 mapValues/reduceByKey 예시  (0) 2016.11.14
Posted by '김용환'
,

[scala] Future 1

scala 2016. 11. 22. 09:31


scala의 Future는 다음과 같이 단순하게 개발할 수 있다. 

마치 java의 Runnable과 비슷한 느낌으로 개발할 수 있다. (ExecutionContext라는 것은 빼고.)


object Main extends App {
import scala.concurrent.Future
import scala.concurrent.ExecutionContext

val context: ExecutionContext = scala.concurrent.ExecutionContext.global

println("Start")
Future {
println("1")
Thread.sleep(1000) // 실제 코드가 들어갈 장소
println("2")
} (context)

println("End")

Thread.sleep(1000) // 결과를 얻기 위해 기다리는 시간.
}


결과


Start

End

1

2



마지막에 결과를 얻기 위해 Thread.sleep을 주기가 민망한 코드이니. 고급스러운 Await를 사용한다.



import scala.concurrent.ExecutionContext
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

val context: ExecutionContext = scala.concurrent.ExecutionContext.global

println("Start")
val f = Future {
println("1")
Thread.sleep(1000) // 실제 코드가 들어갈 장소
println("2")
} (context)

println("End")
val result = Await.result(f, 1 second) // Await로 변경.

결과는 동일하다. 



Future를 사용했다고 해서 바로 실행이 되지 않는다. 



import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

val f1 = Future{ println("hello"); 10 + 20 }
println(f1.value)
println(f1.isCompleted)


결과는 다음과 같다.


hello

None

false


None

hello

false


Future가 실행되지 않았다. 단순한 출력은 되지만, Future의 내부 값이 실행되기 위해서는 작업이 필요하다.



onComplete를 실행해야 한다. 

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

println
f.onComplete {
case Success(value) => println(value)
case Failure(e) => e.printStackTrace
}
println(f.value)
println(f.isCompleted)


결과는 다음과 같다. onComplete가 실행되어야 f.value를 알 수 있게 된다. 


hello

None

false


world

Some(Success(world))

true



onComplete의 실행은 onSuccess와 onFailure의 동작과 비슷한 역할을 기대할 수 있다.


import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

println
f.onSuccess {
case value => println(value)
}
f.onFailure {
case e => e.printStackTrace
}

println(f.value)
println(f.isCompleted)

결과는 동일하다.


hello

None

false


world

Some(Success(world))

true





참고로 scala EPEL은 값이 미리 계산되니.. 유의할 필요가 있다.


scala>   import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.ExecutionContext.Implicits.global


scala> import scala.concurrent.Future

import scala.concurrent.Future


scala>  val f = Future{ println("FOO"); 10 + 2}

FOO

f: scala.concurrent.Future[Int] = List()


scala>  f.value

res2: Option[scala.util.Try[Int]] = Some(Success(12))



Future의 계산이 완료되면, Future는 계산된 값에 대한 단지 래퍼이다. 다시 계산을 수행하길 원한다면, 새로운 Future 인스턴스를 생성해야 한다.




Future는 상태를 가지고 있다.


Future 내부 객체를 살펴보면, 다음과 같이 계산 결과 여부와 값을 가지고 있음을 알 수 있다. 

def isCompleted: Boolean
def value: Option[Try[T]]




Future는 변환 메소드를 사용할 수 있다. map, filter, foreach을 사용하는 예시이다.


import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

Future(1).map(_ + 11).filter(_ % 2 == 0).foreach(println)

Thread.sleep(1000)

결과는 12이다.



기존에 선언했던 context를 내부적으로 global로 import해서 간단하게 적용할 수 있다. global은 암시 객체인데, map의 시그내처에서 사용할 수 있도록 선언되어 있다. 

(컬렉션의 map과 다르다)


implicit lazy val global: ExecutionContextExecutor = 
                 impl.ExecutionContextImpl.fromExecutor(null: Executor)
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] 
         = { // transform(f, identity)
....
}



ExecutionContext는 런타임 비동기를 제공하는 Future 뒤의 시스템으로 생각될 수 있다. ExecutionContext는 트레이트로서 java의 Runnable을 실행하는 execute 메소드를 가지고 있다. 


trait ExecutionContext {

/** Runs a block of code on this execution context.
*
* @param runnable the task to execute
*/
def execute(runnable: Runnable): Unit

/** Reports that an asynchronous computation failed.
*
* @param cause the cause of the failure
*/
def reportFailure(@deprecatedName('t) cause: Throwable): Unit

/** Prepares for the execution of a task. Returns the prepared execution context.
*
* `prepare` should be called at the site where an `ExecutionContext` is received (for
* example, through an implicit method parameter). The returned execution context may
* then be used to execute tasks. The role of `prepare` is to save any context relevant
* to an execution's ''call site'', so that this context may be restored at the
* ''execution site''. (These are often different: for example, execution may be
* suspended through a `Promise`'s future until the `Promise` is completed, which may
* be done in another thread, on another stack.)
*
* Note: a valid implementation of `prepare` is one that simply returns `this`.
*
* @return the prepared execution context
*/
def prepare(): ExecutionContext = this

}




ExecutionContext 트레이트의 동반자 객체를 살펴보면 java의 ExecutorService와 Executor에서 ExecutionContext를 얻을 수 있다. 


ExecutorService를 통해 ExecutionContextExecutorService와 ExecutionContextExecutor를 생성하는 팩토리 메소드이다. 

object ExecutionContext {

/** Creates an `ExecutionContext` from the given `ExecutorService`.
*
* @param e the `ExecutorService` to use. If `null`, a new `ExecutorService` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @param reporter a function for error reporting
* @return the `ExecutionContext` using the given `ExecutorService`
*/
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit) : ExecutionContextExecutorService =
impl.ExecutionContextImpl.fromExecutorService(e, reporter)

/** Creates an `ExecutionContext` from the given `ExecutorService` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]].
*
* If it is guaranteed that none of the executed tasks are blocking, a single-threaded `ExecutorService`
* can be used to create an `ExecutionContext` as follows:
*
* {{{
* import java.util.concurrent.Executors
* val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
* }}}
*
* @param e the `ExecutorService` to use. If `null`, a new `ExecutorService` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @return the `ExecutionContext` using the given `ExecutorService`
*/
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)

/** Creates an `ExecutionContext` from the given `Executor`.
*
* @param e the `Executor` to use. If `null`, a new `Executor` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @param reporter a function for error reporting
* @return the `ExecutionContext` using the given `Executor`
*/
def fromExecutor(e: Executor, reporter: Throwable => Unit) : ExecutionContextExecutor =
impl.ExecutionContextImpl.fromExecutor(e, reporter)

/** Creates an `ExecutionContext` from the given `Executor` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]].
*
* @param e the `Executor` to use. If `null`, a new `Executor` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @return the `ExecutionContext` using the given `Executor`
*/
def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)


impl.ExecutionContextImpl의 구현은 다음과 같다. ForkJoinTask를 구현한 내부 내부 클래스와 fromExecutor로 구성되어 있다. 

private[concurrent] object ExecutionContextImpl {

final class AdaptedForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] {
final override def setRawResult(u: Unit): Unit = ()
final override def getRawResult(): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {
case anything: Throwable
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null
case some ⇒ some.uncaughtException(t, anything)
}
throw anything
}
}

def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
override def execute(command: Runnable) = executor.execute(command)
override def shutdown() { asExecutorService.shutdown() }
override def shutdownNow() = asExecutorService.shutdownNow()
override def isShutdown = asExecutorService.isShutdown
override def isTerminated = asExecutorService.isTerminated
override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit)
override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable)
override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t)
override def submit(runnable: Runnable) = asExecutorService.submit(runnable)
override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
}
}



리턴 타입인 ExeuctionContextExecutor와 ExecutionContextExecutorService는  java의 Excecution를 implement한다. 




/**
* An [[ExecutionContext]] that is also a
* Java [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html Executor]].
*/
trait ExecutionContextExecutor extends ExecutionContext with Executor


/**
* An [[ExecutionContext]] that is also a
* Java [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]].
*/
trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService




이제 아래 코드가 눈이 들어오기 시작한다. 


implicit lazy val
global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
def global: ExecutionContextExecutor = Implicits.global





Future의 apply 함수를 보면,다음과 같다.



def apply[T](body: =>T)
              (implicit @deprecatedName('execctx) executor: ExecutionContext)
               : Future[T] = impl.Future(body)


'scala' 카테고리의 다른 글

[scala] Future말고 Promise  (0) 2016.11.27
[scala] Future 2  (0) 2016.11.23
[zeppelin] spark의 변수 공유하기  (0) 2016.11.18
spark의 mapValues/reduceByKey 예시  (0) 2016.11.14
[scala] List.map(function) 예시  (0) 2016.11.08
Posted by '김용환'
,



zeppelin에서 공유 변수를 사용하는 방식이 있다.


디폴트(isolated)는 같은 노트에서도 변수를 공유하지 않는다. 



하나의 노트에서만 변수를 공유하려면,  다음과 같이 진행한다. 



zeppelin -> interpreter 설정 -> spark interpreter 설정에서 edit 실행 -> interpreter mode를 scope 변경 -> spark interpreter 설정에서 restart 실행



만약 전체 노트에서 공유하고 싶다면, shared mode로 변경한다. 




테스트를 다음과 같이 진행할 수 있다. 







참고로, 


- 변수 공유는 interpreter가 재시작되면 다시 노트를 실행해 결과를 캐싱하게 해야 한다.

- registerTempTable에 저장하면 모든 노트에서 사용할 수 있다. 

'scala' 카테고리의 다른 글

[scala] Future 2  (0) 2016.11.23
[scala] Future 1  (0) 2016.11.22
spark의 mapValues/reduceByKey 예시  (0) 2016.11.14
[scala] List.map(function) 예시  (0) 2016.11.08
[zeppelin] zeppelin으로 spark 연동 시 팁 (또는 주의 사항)  (0) 2016.11.07
Posted by '김용환'
,



sparkContext에 mapValues와 reduceByKey 예시를 설명한다.



코드로 간단히 설명하면 다음과 같다. 


val inputrdd = sc.parallelize(Seq(("arth",10), ("arth", 20), ("samuel", 60), ("jack", 65)))


val mapped = inputrdd.mapValues(x => 1);

mapped.collect.foreach(println)


val reduced = mapped.reduceByKey(_ + _)

reduced.collect.foreach(println)


mapValues는 map의 값을 1로 변경한다.

reduceByKey는 key의 값으로 키의 값이 동일한 개수를 얻는다 .


<결과>


inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[275] at parallelize at <console>:56
mapped: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[276] at mapValues at <console>:58
(arth,1)
(arth,1)
(samuel,1)
(jack,1)
reduced: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[277] at reduceByKey at <console>:60
(arth,2)
(samuel,1)
(jack,1)



아래 코드에 대한 설명한다 .

mapValues를 활용해 튜플을 추가하면, map의 값에 tuple을 추가한다.

reduceKey를 활용해서 키의 값을 근거로 값과 개수를 튜플로 추가할 수 있다.

reduceKey를 활용해서 얻은 결과값을 map을 이용해 평균 값을 구한다.

val inputrdd = sc.parallelize(Seq(("arth",10), ("arth", 20), ("samuel", 60), ("jack", 65))) val mapped = inputrdd.mapValues(x => (x, 1)); mapped.collect.foreach(println)
val reduced = mapped.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) reduced.collect.foreach(println) val average = reduced.map { x => val temp = x._2 val total = temp._1 val count = temp._2 (x._1, total / count) } average.collect.foreach(println)


<결과>
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[281] at parallelize at <console>:56
mapped: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[282] at mapValues at <console>:58
(arth,(10,1))
(arth,(20,1))
(samuel,(60,1))
(jack,(65,1))
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[283] at reduceByKey at <console>:60
(arth,(30,2))
(samuel,(60,1))
(jack,(65,1))
average: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[284] at map at <console>:62
(arth,15)
(samuel,60)
(jack,65)



Posted by '김용환'
,