'2016/11/22'에 해당되는 글 1건

  1. 2016.11.22 [scala] Future 1

[scala] Future 1

scala 2016. 11. 22. 09:31


scala의 Future는 다음과 같이 단순하게 개발할 수 있다. 

마치 java의 Runnable과 비슷한 느낌으로 개발할 수 있다. (ExecutionContext라는 것은 빼고.)


object Main extends App {
import scala.concurrent.Future
import scala.concurrent.ExecutionContext

val context: ExecutionContext = scala.concurrent.ExecutionContext.global

println("Start")
Future {
println("1")
Thread.sleep(1000) // 실제 코드가 들어갈 장소
println("2")
} (context)

println("End")

Thread.sleep(1000) // 결과를 얻기 위해 기다리는 시간.
}


결과


Start

End

1

2



마지막에 결과를 얻기 위해 Thread.sleep을 주기가 민망한 코드이니. 고급스러운 Await를 사용한다.



import scala.concurrent.ExecutionContext
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

val context: ExecutionContext = scala.concurrent.ExecutionContext.global

println("Start")
val f = Future {
println("1")
Thread.sleep(1000) // 실제 코드가 들어갈 장소
println("2")
} (context)

println("End")
val result = Await.result(f, 1 second) // Await로 변경.

결과는 동일하다. 



Future를 사용했다고 해서 바로 실행이 되지 않는다. 



import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

val f1 = Future{ println("hello"); 10 + 20 }
println(f1.value)
println(f1.isCompleted)


결과는 다음과 같다.


hello

None

false


None

hello

false


Future가 실행되지 않았다. 단순한 출력은 되지만, Future의 내부 값이 실행되기 위해서는 작업이 필요하다.



onComplete를 실행해야 한다. 

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

println
f.onComplete {
case Success(value) => println(value)
case Failure(e) => e.printStackTrace
}
println(f.value)
println(f.isCompleted)


결과는 다음과 같다. onComplete가 실행되어야 f.value를 알 수 있게 된다. 


hello

None

false


world

Some(Success(world))

true



onComplete의 실행은 onSuccess와 onFailure의 동작과 비슷한 역할을 기대할 수 있다.


import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}

val f = Future{ println("hello"); "world"}
println(f.value)
println(f.isCompleted)

println
f.onSuccess {
case value => println(value)
}
f.onFailure {
case e => e.printStackTrace
}

println(f.value)
println(f.isCompleted)

결과는 동일하다.


hello

None

false


world

Some(Success(world))

true





참고로 scala EPEL은 값이 미리 계산되니.. 유의할 필요가 있다.


scala>   import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.ExecutionContext.Implicits.global


scala> import scala.concurrent.Future

import scala.concurrent.Future


scala>  val f = Future{ println("FOO"); 10 + 2}

FOO

f: scala.concurrent.Future[Int] = List()


scala>  f.value

res2: Option[scala.util.Try[Int]] = Some(Success(12))



Future의 계산이 완료되면, Future는 계산된 값에 대한 단지 래퍼이다. 다시 계산을 수행하길 원한다면, 새로운 Future 인스턴스를 생성해야 한다.




Future는 상태를 가지고 있다.


Future 내부 객체를 살펴보면, 다음과 같이 계산 결과 여부와 값을 가지고 있음을 알 수 있다. 

def isCompleted: Boolean
def value: Option[Try[T]]




Future는 변환 메소드를 사용할 수 있다. map, filter, foreach을 사용하는 예시이다.


import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

Future(1).map(_ + 11).filter(_ % 2 == 0).foreach(println)

Thread.sleep(1000)

결과는 12이다.



기존에 선언했던 context를 내부적으로 global로 import해서 간단하게 적용할 수 있다. global은 암시 객체인데, map의 시그내처에서 사용할 수 있도록 선언되어 있다. 

(컬렉션의 map과 다르다)


implicit lazy val global: ExecutionContextExecutor = 
                 impl.ExecutionContextImpl.fromExecutor(null: Executor)
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] 
         = { // transform(f, identity)
....
}



ExecutionContext는 런타임 비동기를 제공하는 Future 뒤의 시스템으로 생각될 수 있다. ExecutionContext는 트레이트로서 java의 Runnable을 실행하는 execute 메소드를 가지고 있다. 


trait ExecutionContext {

/** Runs a block of code on this execution context.
*
* @param runnable the task to execute
*/
def execute(runnable: Runnable): Unit

/** Reports that an asynchronous computation failed.
*
* @param cause the cause of the failure
*/
def reportFailure(@deprecatedName('t) cause: Throwable): Unit

/** Prepares for the execution of a task. Returns the prepared execution context.
*
* `prepare` should be called at the site where an `ExecutionContext` is received (for
* example, through an implicit method parameter). The returned execution context may
* then be used to execute tasks. The role of `prepare` is to save any context relevant
* to an execution's ''call site'', so that this context may be restored at the
* ''execution site''. (These are often different: for example, execution may be
* suspended through a `Promise`'s future until the `Promise` is completed, which may
* be done in another thread, on another stack.)
*
* Note: a valid implementation of `prepare` is one that simply returns `this`.
*
* @return the prepared execution context
*/
def prepare(): ExecutionContext = this

}




ExecutionContext 트레이트의 동반자 객체를 살펴보면 java의 ExecutorService와 Executor에서 ExecutionContext를 얻을 수 있다. 


ExecutorService를 통해 ExecutionContextExecutorService와 ExecutionContextExecutor를 생성하는 팩토리 메소드이다. 

object ExecutionContext {

/** Creates an `ExecutionContext` from the given `ExecutorService`.
*
* @param e the `ExecutorService` to use. If `null`, a new `ExecutorService` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @param reporter a function for error reporting
* @return the `ExecutionContext` using the given `ExecutorService`
*/
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit) : ExecutionContextExecutorService =
impl.ExecutionContextImpl.fromExecutorService(e, reporter)

/** Creates an `ExecutionContext` from the given `ExecutorService` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]].
*
* If it is guaranteed that none of the executed tasks are blocking, a single-threaded `ExecutorService`
* can be used to create an `ExecutionContext` as follows:
*
* {{{
* import java.util.concurrent.Executors
* val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
* }}}
*
* @param e the `ExecutorService` to use. If `null`, a new `ExecutorService` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @return the `ExecutionContext` using the given `ExecutorService`
*/
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)

/** Creates an `ExecutionContext` from the given `Executor`.
*
* @param e the `Executor` to use. If `null`, a new `Executor` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @param reporter a function for error reporting
* @return the `ExecutionContext` using the given `Executor`
*/
def fromExecutor(e: Executor, reporter: Throwable => Unit) : ExecutionContextExecutor =
impl.ExecutionContextImpl.fromExecutor(e, reporter)

/** Creates an `ExecutionContext` from the given `Executor` with the [[scala.concurrent.ExecutionContext$.defaultReporter default reporter]].
*
* @param e the `Executor` to use. If `null`, a new `Executor` is created with [[http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext$@global:scala.concurrent.ExecutionContextExecutor default configuration]].
* @return the `ExecutionContext` using the given `Executor`
*/
def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)


impl.ExecutionContextImpl의 구현은 다음과 같다. ForkJoinTask를 구현한 내부 내부 클래스와 fromExecutor로 구성되어 있다. 

private[concurrent] object ExecutionContextImpl {

final class AdaptedForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] {
final override def setRawResult(u: Unit): Unit = ()
final override def getRawResult(): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {
case anything: Throwable
val t = Thread.currentThread
t.getUncaughtExceptionHandler match {
case null
case some ⇒ some.uncaughtException(t, anything)
}
throw anything
}
}

def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
override def execute(command: Runnable) = executor.execute(command)
override def shutdown() { asExecutorService.shutdown() }
override def shutdownNow() = asExecutorService.shutdownNow()
override def isShutdown = asExecutorService.isShutdown
override def isTerminated = asExecutorService.isTerminated
override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit)
override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable)
override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t)
override def submit(runnable: Runnable) = asExecutorService.submit(runnable)
override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
}
}



리턴 타입인 ExeuctionContextExecutor와 ExecutionContextExecutorService는  java의 Excecution를 implement한다. 




/**
* An [[ExecutionContext]] that is also a
* Java [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html Executor]].
*/
trait ExecutionContextExecutor extends ExecutionContext with Executor


/**
* An [[ExecutionContext]] that is also a
* Java [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]].
*/
trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService




이제 아래 코드가 눈이 들어오기 시작한다. 


implicit lazy val
global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
def global: ExecutionContextExecutor = Implicits.global





Future의 apply 함수를 보면,다음과 같다.



def apply[T](body: =>T)
              (implicit @deprecatedName('execctx) executor: ExecutionContext)
               : Future[T] = impl.Future(body)


'scala' 카테고리의 다른 글

[scala] Future말고 Promise  (0) 2016.11.27
[scala] Future 2  (0) 2016.11.23
[zeppelin] spark의 변수 공유하기  (0) 2016.11.18
spark의 mapValues/reduceByKey 예시  (0) 2016.11.14
[scala] List.map(function) 예시  (0) 2016.11.08
Posted by '김용환'
,