この記事では、プログラミング言語Scalaにおいて関数型ライブラリCatsをベースとした非同期処理(グリーンスレッド)ライブラリである Cats Effect(CE) において、プロセスがSIGINTを受信した場合にどのようにCEが振る舞うかを解説する。
またこの記事は執筆時点で最新であるCE 3を対象とする。
IOApp
もしあなたがCEユーザであるなら、おそらくIOAppトレイトを使ったMainオブジェクトをScalaプログラムのエントリポイントに設定している場合がほとんどだろう。
//> using scala 3.2 //> using dep "org.typelevel::cats-effect:3.5.0" import cats.effect.{IO, IOApp, ExitCode} import cats.effect.std.Console object Main extends IOApp: def run(args: List[String]) = for { _ <- IO.print("Enter your name: ") name <- Console[IO].readLine _ <- IO.println(s"Hello, $name") } yield ExitCode.Success
このプログラムを実行すると、Scalaはプロンプトとともにあなたの名前を尋ね、そして入力を待ち受ける。名前が入力されると、それを表示して終了する。
もしプログラムが入力を待ち受けている間にあなたがCtrl+Cを押下した場合、プログラムはそのまま終了する。当たり前に見えるかもしれないが、複雑な処理が行なわれていて、それはIOAppのベールの下に隠蔽されている。
CEはIOAppにシグナルハンドリングの責務を負わせている。つまり、SIGINTなどのシグナルを受け取ったとき、IOAppがこれをハンドルし、実行中の同期/非同期処理を 適切に終了 させるのだ。
例えば、runが何らかのリソースを確保している場合にSIGINTを受信した場合でも、IOAppはリソースを解放してからプログラムを終了させようとする:
//> using scala 3.2 //> using dep "org.typelevel::cats-effect:3.5.0" import cats.effect.{IO, IOApp, ExitCode, Resource} import cats.effect.std.Console object Main extends IOApp: val importantResource: Resource[IO, Int] = Resource.make(IO.println("Acquiring resource...") >> IO.pure(42))(_ => IO.println("Releasing resource...") ) def run(args: List[String]) = for { re <- importantResource.use { r => for { _ <- IO.print("Enter your name: ") name <- Console[IO].readLine _ <- IO.println(s"Hello, $name") } yield () } } yield ExitCode.Success
Acquiring resource... Enter your name: ^CReleasing resource...
上に示したように、入力待ち状態でSIGINTを送ってみるときちんとimportantResourceの解放処理が入っていることがわかる。これは、IOAppがきちんとSIGINTをハンドリングしている証左だ。
具体的にどこでシグナルがハンドリングされているか見ていこう。
IOAppはJavaランタイムのシャットダウンフックにhandleShutDownメソッドを登録する。- SIGINTがプロセスに届くと、Javaランタイムはあらかじめ登録されたフックである
handleShutDownを呼び出す。 handleShutDownは、メインFiberをキャンセルし、事前に設定されたタイムアウトまで猶予を与える。- Fiberがキャンセル完了するかタイムアウトが到来したとき、軽量スレッドの管理用ランタイムである
runtime.shutdown()が呼び出され、ネイティブスレッドなどが引き上げていき終了する。
このように、SIGINTを受信した場合はメインFiberに対してcancelメソッドが呼び出される。Fiberがキャンセルされるとき、それが呼び出しているFiberを連鎖的にキャンセルしていくので、最終的に全てのFiberが停止されるようになっている。ちなみにFiberとはCEにおける軽量スレッドで、GoのGoroutineのようなものであり、ランタイムによって制御された実行単位である。
uncancelable
シグナルによってFiberが停止されることはわかったが、中途半端な状態で停止することが許されないFiberも存在する。例えばファイルをオープンしている場合は閉じなければならないし、外部プロセスを呼び出している場合は終了まで待たなければならない。クリティカルなセクションでは停止できない。
このような目的のために、CEはuncancelableというメソッドを用意している。これが呼び出されたメソッドは、それが自発的に終了するまでキャンセル不可能になる。例えば、以下のコードは10秒間待つ間はキャンセルできない:
//> using scala 3.2 //> using dep "org.typelevel::cats-effect:3.5.0" import cats.effect.{IO, IOApp, ExitCode, Resource} import cats.effect.std.Console object Main extends IOApp: def run(args: List[String]) = for { _ <- IO.print("Waiting 10 sec") _ <- IO .sleep(scala.concurrent.duration.FiniteDuration(10, "seconds")) .uncancelable _ <- IO.println("Done!") } yield ExitCode.Success
uncancelableはいたるところで使われ、不整合な状態でプログラムが停止することを防いでいる。ちなみに、キャンセル不可能にはせずにキャンセル時に特別な処理を行うonCancelメソッドも用意されているので、うまく活用したい。
あわせて読みたい
https://typelevel.org/cats-effect/api/3.x/cats/effect/IOApp.html