[scala] Future 2

scala 2016. 11. 23. 11:49


Future에 있어서 약점은 타임아웃도 Exception이라는 점이다. 


타임아웃이 발생했을 때 Exception이 발생한다. 

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

println("Start")
val f = Future {
println("1")
Thread.sleep(2000) // 실제 코드가 들어갈 장소
println("2")
}

println("End")
val result = Await.result(f, 1 second)


결과는 예외이다. 

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [1 second]

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

at scala.concurrent.BlockContext$DefaultBlockContext$

....




일반 Exception과 타임아웃 Exception을 구분하기 위해 클래스를 감쌀 수 있다. 


https://github.com/PacktPublishing/Scala-High-Performance-Programming/blob/master/chapter6/src/main/scala/highperfscala/concurrency/future/SafeAwait.scala



result 함수는 Future는 Awaitable을 상속받은 클래스와 시간을 받고 Future를 감싼다. TimeoutException시는 None으로 리턴한하게 한다. 

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

object SafeAwait {
def result[T](awaitable: Awaitable[T],
atMost: Duration): Option[T] = Try(Await.result(awaitable, atMost)) match {
case Success(t) => Some(t)
case Failure(_: TimeoutException) => None
case Failure(e) => throw e
}
}

val f1 = Future {
println("11")
5 + 10
}

println(SafeAwait.result(f1, 1 second))

val f2 = Future {
println("11")
Thread.sleep(5000)
5 + 10
}

println(SafeAwait.result(f2, 1 second))


결과는 다음과 같다. 


11

Some(15)

11

None




Future의 변환에서 Exception이 발생할 수 있는데, 이에 대한 Future에서 어떻게 대응할 수 있는 살펴본다.



문자를 숫자로 변환했으니. map 단계에서 변환할 것이다. 

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).map(i => {
println("Multiplying")
i * 2
})

Await.result(f, 1 second)


<결과>


Exception in thread "main" java.lang.NumberFormatException: For input string: "not-an-integer"

at java.lang.NumberFormatException.forInputString






Future에는 recover라는 메소드가 있다. 이를 활용하면, 다운 스트림 에러를 보정할 수 있다. 



def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete { v => p complete (v recover pf) }
p.future
}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recover {
case _: NumberFormatException => 2
}.map(i => {
println("Multiplying")
i * 2
})
println(Await.result(f, 1 second))

결과는 다음과 같다. 


Multiplying

4





recoverWith라는 메소드도 있다. 부분 함수를 사용할 수 있으며, recover와 동일하게 처리할 수 있다. 


def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])
         (implicit executor: ExecutionContext): Future[U] = {




import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recoverWith {
case _: NumberFormatException => Future(2)
}.map(i => {
println("Multiplying")
i * 2
})
println(Await.result(f, 1 second))

결과는 다음과 같다. 


Multiplying

4




Await.result 대신 onComplete를 사용할 수 있다. 



import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future}
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await}
import scala.util.{Failure, Success, Try }

val f = Future("stringstring").map(_.toInt).recover {
case _: NumberFormatException => 2
}.map(i => {
println("Multiplying")
i * 2
}).onComplete {
case Success(i) => println(s"$i")
case Failure(e) => println(s"${e.getMessage}")
}

Thread.sleep(1000)




Future는 이미 계산된 값으로 만들 수 있다. 그리고 특정 값을 래핑한 Future로 만들 수 있다. (Future를 리턴할 때)


예)

val stringFuture: Future[String] = Future.successful("yes")



Future.successful이나 Future.failed를 이용할 수 도 있고, 더 추상화된 클래스인 Promise를 사용할 수 있다. 아래 예시 결과에서 본 것처럼 모두 계산된 (isCompleted=true) 상태이다. 

import scala.concurrent.{Future}
import scala.concurrent.Promise

val f1 = Future.successful(3)
println(f1.isCompleted)
println(f1)

val f2 = Future.failed(new IllegalArgumentException("no!"))
println(f2.isCompleted)
println(f2)


val promise = Promise[String]()
val future = promise.future
println(promise.success("hello").isCompleted)


결과


true

scala.concurrent.impl.Promise$KeptPromise@146ba0ac

true

scala.concurrent.impl.Promise$KeptPromise@4dfa3a9d

true




'scala' 카테고리의 다른 글

[scala] 꼬리 재귀(tail recursion)와 @tailrec  (0) 2016.12.05
[scala] Future말고 Promise  (0) 2016.11.27
[scala] Future 1  (0) 2016.11.22
[zeppelin] spark의 변수 공유하기  (0) 2016.11.18
spark의 mapValues/reduceByKey 예시  (0) 2016.11.14
Posted by '김용환'
,