akka 2.4.17 버전의 간단한 코드 예시이다.




액터 모델의 아카 구현에는 다음과 같은 특성이 있다.

* 액터를 생성하면 아카는 ActionRef를 제공하므로 상태를 알 수 있다.

* 액터는 실제 자바 스레드에서 실행되며 일부 액터는 동일한 스레드를 공유 할 수 있다.

* unbounded, bounded, priority 등의 세 가지 메일박스 타입이 있다. 액터는 액터를 생성할 수 있다.

* 액터는 특정 메시지를 찾는 사서함을 검색할 수 있다.

* 모든 종료된 액터의 메시지가 있는 종료 메일 박스가 있다.


자바에서는 예외를 던져 많은 조합과 시나리오로 처리해야 한다. let-it-crash 방식의 슈퍼바이저는 네 가지 옵션만 있다.

* 액터를 다시 진행하면 내부 상태는 유지된다.

* 액터를 재시작하면 내부 상태는 정리된다.

* 액터를 종료한다.

* 문제를 확대하기 위해 실패했다는 메시지를 보낸다.




1. 한 액터를 사용한 예제이다. 

import akka.actor._

class MotorActor extends Actor {
override def receive = {
case "left" => println("left!")
case "right" => println("right!")
case "up" => println("up!")
case "down" => println("down!")
case _ => println("error!")
}
}

object Main extends App {
val actorSystem = ActorSystem("MotorActor")
val motorActor = actorSystem.actorOf(Props[MotorActor], "motorActor")
motorActor ! "up"
motorActor ! "down"
motorActor ! "xxxxx"

actorSystem.terminate() }


모든 액터의 동작은 receive 메소드에서 정의된다.


receive 메소드는 모든 함수형 언어의 중요하고 강력한 부분인 패턴 일치를 사용하여 구현된다. 


패턴 매치 표현식은 첫 번째 라인과 비교하므로 switch-case 문이 사용되지 않는다. 패턴이 일치하지 않으면 두 번째 줄과 비교된다. 모든 case문의 끝에는 디폴트 case 문을 넣는 것을 잊지 말아야 한다. 디폴트 case 문은 항상 보안용이고 매치 표현식의 마지막 라인여야 한다.




실행하면 다음과 같다.


up!

down!

error!






2. 두 액터 간의 ping/pong 예시이다.


import akka.actor._
import knight76.cp.DbConnectionPoolExtension
import knight76.cp.ConnectionPool.PrintDbStats

case object PingMessage
case object PongMessage
case object StartMessage
case object StopMessage
case object ErrorMessage

object Main extends App {
val actorSystem = ActorSystem("PingPongSystem")
val pong = actorSystem.actorOf(Props[Pong], "pong")
val ping = actorSystem.actorOf(Props(new Ping(pong)), "ping")

ping ! StartMessage
actorSystem.terminate }



Ping, Pong 클래스 예시이다. 



ActorSystem은 이름을 매개 변수로 받는다. 영숫자 문자와 하이픈은 허용된다(선행 문자에는 포함되지 않는다).

ActorSystem의 actorOf 메소드를 호출하여 액터를 만든다. 나중에 보겠지만 다른 액터 내부에서 액터를 만들 수 있다. 액터가 생성될 때 액터는 비동기적으로 시작한다.

액터에 메시지를 보내려면 ! 오퍼레이션을 사용한다.

액터가 뭔가를 처리한다. 그 후에 ActorSystem이 종료된다.





참고로 ! 오퍼레이터는 액터 인스턴스가 아닌 액터 참조에서만 작동한다.

import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorRef}

class Ping(pong: ActorRef) extends Actor {
var count = 0
def incrementAndPrint {
count += 1;
println("ping")
}

def receive = {
case StartMessage =>
incrementAndPrint
pong ! PingMessage
case PongMessage =>
incrementAndPrint
if (count > 10) {
sender ! StopMessage
println("ping stopped")
context.stop(self)
} else {
sender ! PingMessage
}
case ErrorMessage =>
throw new RuntimeException
}

override def preStart(): Unit = {
println("ping, preStart..")
super.preStart()
}
}




import akka.actor.{Actor, SupervisorStrategy}

class Pong extends Actor {
def receive = {
case PingMessage =>
println(" pong")
sender ! PongMessage
case StopMessage =>
println("pong stopped")
context.stop(self)
}

override def preStart(): Unit = {
println("pong, preStart..")
super.preStart()
}

}

결과는 다음과 같다.


ping, preStart..

pong, preStart..

ping

  pong

ping

  pong

ping

  pong

ping

  pong

ping

  pong

ping

  pong

ping

  pong

ping

  pong

ping

  pong

ping

  pong

ping

ping stopped

pong stopped



여기서 Main 클래스에서 pong 객체로 ErrorMessage를 전달하면 Ping 클래스의 receive 메소드에서 처리할 수 없고 메시지를 발송할 수 없을 것이다 


import akka.actor._
import knight76.cp.DbConnectionPoolExtension
import knight76.cp.ConnectionPool.PrintDbStats

case object PingMessage
case object PongMessage
case object StartMessage
case object StopMessage
case object ErrorMessage

object Main extends App {
val actorSystem = ActorSystem("PingPongSystem")
val pong = actorSystem.actorOf(Props[Pong], "pong")
val ping = actorSystem.actorOf(Props(new Ping(pong)), "ping")

ping ! StartMessage
ping ! ErrorMessage

actorSystem.terminate }



다음과 같은 에러가 발생한다.


ping, preStart..

pong, preStart..

ping

  pong

[INFO] [04/09/2017 23:44:49.304] [PingPongSystem-akka.actor.default-dispatcher-4] [akka://PingPongSystem/user/ping] Message [knight76.PongMessage$] from Actor[akka://PingPongSystem/user/pong#-987561206] to Actor[akka://PingPongSystem/user/ping#-1979813926] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.






정리


* 액터 시스템(ActorSystem)

* 액터 시스템은 액터의 그룹이다. 그것은 다음과 같은 특성을 가지고 있다.
* 액터 시스템은 계층적이다. 모든 액터는 항상 액터 슈퍼바이저가 있다. 액터는 형제와 자식이 있을 수 있다.
* 사무실 안처럼 동일한 액터 시스템 아래의 액터는 전달자(dispatchers), 배포(deployment), 주소(address)를 공유한다.
* 액터 시스템에 액터가 만들어지고 찾을 수 있는 할 중심점이다.
* 액터 시스템은 내부적으로 스레드 컨트롤러이다. 액터 시스템은 애플리케이션에 스레드를 할당할 시기를 결정한다.
* 액터 시스템이 종료되지 않으면(system.shutdown 라인을 사용하면) 애플리케이션이 종료되지 않는다. 액터 시스템이 실행되는 동안 앱이 실행 중이다.





액터 참조(ActorRef)


* 코드에서 보다시피 ActorSystem의 actorOf 메소드에서는 액터를 비동기 적으로 시작하고 액터 참조를 리턴하는 두 가지 작업을 포함한다.

* ActorRef는 핸들(handle)이라서 액터 시스템을 중단할 수 없다.

* ActorRef는 액터의 파사드(facade)로 사용하기 때문에 액터 인스턴스를 직접 조작하거나 변수를 변경할 수 없다.

* ActorRef는 액터간의 통신 방법이다. ActorRef 메일박스에 메시지를 저장할 수 있다.

* ActorRef는 불변이기 때문에 변경할 수 없다. 참조만 할 뿐이다.

* 한 액터에는 하나의 ActorRef만 있다. 하나의 ActorRef는 하나의 액터만을 참조한다. 바로 일대일 관계이다.

* 액터 모델이 불편할 만한 내용으로는 ActorRef는 직렬화 가능하고 서버에 독립적이란 점이다. 따라서 네트워크를 통해 ActorRef를 공유, 전달, 전송할 수 있다.

* 높은 동시성 환경에서 액터 인스턴스를 직접 접근하면 위험한 환경이 될 수 있다는 점이다. ActorRef를 통해 메시지에 접근하고 메시지를 보내면 필요한 모든 ACID를 보장한다.






참고로 액터 시스템의 terminate(구 버전은 shutdown)을 호출하지 않으면 애플리케이션이 계속 실행된다.테스트 상황에서는 termniate 메소드를 호출한다.





receive 메소드 말고 액터 라이프 사이클 관련 코드를 가질 수 있다. 


* constructor : 클래스의 인스턴스가 생성될 때 호출된다(자바의 경우와 같음)

* preStart : 액터가 시작된 후 바로 호출된다.

* postStop : 액터가 정지된 후 일반적으로, 정리 작업을 위해 바로 호출된다.

* preRestart : 액터가 재시작된 후 바로 호출된다. 일반적으로 재시작은 예외로 인해 발생한다. preRestart는 매개 변수로 Throwable와 메시지를 받고 이전 객체는 해당 매개 변수를 받는다.

* postRestart : 액터가 재시작된 직후에 호출된다. 일반적으로 재시작은 예외로 인해 발생한다. postRestart는 Throwable을 매개 변수로 받는다. 새로운 객체는 매개 변수를 받고 preStart 메소드를 호출한다.



코드는 다음과 같다. 
import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorRef}

class Ping(pong: ActorRef) extends Actor {
var count = 0
def incrementAndPrint {
count += 1;
println("ping")
}

def receive = {
case StartMessage =>
incrementAndPrint
pong ! PingMessage
case PongMessage =>
incrementAndPrint
if (count > 10) {
sender ! StopMessage
println("ping stopped")
context.stop(self)
} else {
sender ! PingMessage
}
case ErrorMessage =>
throw new RuntimeException
}

override def preStart(): Unit = {
println("ping, preStart..")
super.preStart()
}

override def postStop(): Unit = {
println("ping, postStart..")
super.postStop()
}

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
println("ping, preRestart..")
super.preRestart(reason, message)
}

override def postRestart(reason: Throwable): Unit = {
println("ping, postRestart..")
super.postRestart(reason)
}
}



이번에는 액터 간의 연결을 테스트했다. 


Main -> Pong2 -> Pong 




object Main extends App {
val actorSystem = ActorSystem("PingPongSystem")
val pong2 = actorSystem.actorOf(Props[Pong2], "pong2")

pong2 ! Hi("samuel")

Thread.sleep(10)

actorSystem.terminate }




import akka.actor.{Actor, Props}

case class Hi(name: String)
case class Stop(name: String)

class Pong2 extends Actor {
override def receive = {
case Hi(name) =>
println(s"Hi $name")
val pong = context.actorOf(Props[Pong], "pong")

pong ! PingMessage
pong ! StopMessage
case Stop(name) =>
println(s"stop.. $name")
}
}



결과는 다음과 같다.


Hi samuel

pong, preStart..

  pong

pong stopped




액터 종료 방법


* stop : stop 메소드를 수신하면 액터는 현재 메시지만 처리한다(메시지가 있는 경우에만). 새로운 메시지가 액터의 메일 박스에 도착하거나 메시지가 쌓여 있으면 버려진다.

* PoisonPill : PoisonPill 메시지는 일반적인 메시지이다. 액터의 메시지는 메일 박스에 수신되어 저장된다. PosionPill 메시지가 처리되면 액터가 중지된다.

* gracefulStop :gracefulStop 메소드를 사용하면 액터를 정상적으로 종료할 수 있다. 타임 아웃을 기다린다. 종료하기 전에 구체적인 지시 커맨드가 필요하다면, 이는 좋은 방법이다.



첫 번째 stop 방법이다.

val actorSystem = ActorSystem("PingPongSystem")
val pong = actorSystem.actorOf(Props[Pong], "pong")
val ping = actorSystem.actorOf(Props(new Ping(pong)), "ping")

ping ! StartMessage
....

actorSystem.stop(pong)
actorSystem.stop(ping)

actorSystem.terminate


결과는 다음과 같다.


...

ping stopped

pong stopped



actor system대신 PoisonPill 메시지를 보내면 동일하게 종료된다.

pong ! PoisonPill
ping ! PoisonPill


테스트하다가 아래와 같은 에러가 발생할 수 있다 .


[INFO] [04/10/2017 20:25:09.802] [PingPongSystem-akka.actor.default-dispatcher-4] [akka://PingPongSystem/user] Message [akka.actor.StopChild] from Actor[akka://PingPongSystem/deadLetters] to Actor[akka://PingPongSystem/user] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

[INFO] [04/10/2017 20:25:09.803] [PingPongSystem-akka.actor.default-dispatcher-4] [akka://PingPongSystem/user] Message [akka.actor.StopChild] from Actor[akka://PingPongSystem/deadLetters] to Actor[akka://PingPongSystem/user] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.


그 이유는 메시지를 처리할 수 없을 때 deadLetters 사서함으로 보낸다. 액터 시스템에서 deadLetters 메소드를 통해 접근할 수 있다.



종료 방법 3번째인  gracefulStop을 사용하면 메일 박스에 delivered되지 않아 deadLetter에 저장된다는 메시지를 출력되지 않고 메시지가 모두 처리된 부드럽게 종료된다. 


val actorSystem = ActorSystem("PingPongSystem")
val pong = actorSystem.actorOf(Props[Pong], "pong")
val ping = actorSystem.actorOf(Props(new Ping(pong)), "ping")

ping ! StartMessage
....
import akka.pattern.gracefulStop
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
try {
val stoppedPong: Future[Boolean] = gracefulStop(pong, 2 seconds)
Await.result(stoppedPong, 3 seconds)
println("stopped pong")

val stoppedPing: Future[Boolean] = gracefulStop(pong, 2 seconds)
Await.result(stoppedPing, 3 seconds)
println("stopped ping")
} catch {
case e:Exception => e.printStackTrace
} finally {
actorSystem.terminate
}


ping stopped

pong stopped

stopped pong

stopped ping




중지 외에 종료(kill)도 있다.


val actorSystem = ActorSystem("PingPongSystem")
val pong = actorSystem.actorOf(Props[Pong], "pong")
val ping = actorSystem.actorOf(Props(new Ping(pong)), "ping")

ping ! Kill
println("killed")


결과는 다음과 같다. Kill을 메시지로 받고 종료한다.


pong, preStart..

killed

ping, preStart..

[ERROR] [04/10/2017 20:49:34.007] [PingPongSystem-akka.actor.default-dispatcher-4] [akka://PingPongSystem/user/ping] Kill (akka.actor.ActorKilledException)

ping, postStart..





Posted by '김용환'
,