akka-streamを検討しているので今一度Actorについておさらい。
サンプル
import akka.actor.{Actor, ActorSystem, Props}
object Main extends App {
val system = ActorSystem("sample")
val actor = system.actorOf(Props[SampleActor])
Range(1, 10).foreach(actor ! _)
}
class SampleActor extends Actor {
override def preStart(): Unit = {
super.preStart()
println("start : " + this)
}
override def postStop(): Unit = {
println("stop : " + this)
super.postStop()
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
println("restart : " + this)
}
def receive = {
case x => println(x)
}
}
Acotorにメッセージが送られるので、以下のような結果になる。
start : SampleActor@5f1fb65d 1 2 3 4 5 6 7 8 9
3のとき自爆させてみる
def receive = {
case 3 => throw new RuntimeException("invalid value")
case x => println(x)
}
再起動されている。
start : SampleActor@24764cc2 1 2 stop : SampleActor@24764cc2 start : SampleActor@d5ee644 restart : SampleActor@d5ee644 4 5 6 7 8 9
デフォルトのスーパーバイザーのストラテジーが例外でリスタートするようになっている。
final val defaultDecider: Decider = {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: DeathPactException ⇒ Stop
case _: Exception ⇒ Restart
}
デフォルトをいじるのがややめんどくさそなのでSupervisorを使う。
class SampleSupervisor extends Actor {
val actor = context.actorOf(Props[SampleActor])
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
override def preStart(): Unit = {
super.preStart()
println("start : " + this)
}
def receive = {
case x => actor ! x
}
}
3のところで停止し、4以降処理されなくなる。
start : SampleSupervisor@668e93b9 start : SampleActor@ddad94 1 2 stop : SampleActor@ddad94 [ERROR] [02/03/2019 11:48:42.257] [sample-akka.actor.default-dispatcher-3] [akka://sample/user/$a/$a] invalid value java.lang.RuntimeException: invalid value
メッセージの再送は行われない
Actorモデルを勉強を始めたときに勘違いしていたのが、再起動かけたらエラーになった処理を再処理してくれるかというところで、結論をいうとメッセージを再送しない限りは再処理しない。メッセージを再送する仕組みは作り込むかすればできそうだけど、もっと上位で再処理できるようにした方がいいと思う(最後にコミットして、途中でこけたやつは次回の処理対象になる、など)
次はakka-streamについて書く。