他の言語の非同期プリミティブと比べてもOxかなり良いな〜という記事です。
先日Ox v1.0.0がリリースされた。Oxはプログラミング言語Scalaの非同期処理ライブラリで、Java 21から搭載されたVirtual Threadの機能を活用して、非同期処理のための道具を使いやすい形で提供してくれる。
今回ついに1系がリリースされたので、改めてOxを紹介し、他の言語における非同期処理プリミティブと何が違うのか、どう便利なのかを紹介していければと思う。OxはScalaのライブラリだが、Scalaを知っている人も知らない人でも読めるようなコードになっているので安心してほしい。Oxの詳細はドキュメントを参考にしてほしい。
というのも、以前Oxをフィーチャーした記事を書いたことがあったが、まだメジャーバージョンが1に到達していないのもあり、あまり深く踏み込むことはしていなかったのだ。
- 従来のScala向け非同期ライブラリのアプローチとOx
- 他の言語の非同期処理プリミティブとOx
- Oxの軽量プロセス
- Oxを使ってみよう
- ox.scheduling
- ox.resilience
- チャネル
- ストリーミング処理
- Oxいいんじゃないの
従来のScala向け非同期ライブラリのアプローチとOx
Scalaのための非同期処理ライブラリは他にもたくさんあった。Cats Effect、ZIO、Kyo、などなど。しかしこれらはいずれもPurely Functional、つまり純粋な関数型パラダイムに適合することを目指して開発されていたため、利用のハードルがかなり高く、お世辞にも初心者でも使いやすいとは言えなかった・・・。
// Cats Effectの例 // 1秒ごとに"Hello!"を表示し、10秒で終わる import cats.effect.{IO, IOApp} import cats.syntax.all.* import scala.concurrent.duration.given object HelloEverySecond extends IOApp.Simple { val run: IO[Unit] = (1 to 10).toList.traverse_ { _ => IO.println("Hello!") *> IO.sleep(1.second) } }
Oxは非同期処理のための特別な型を用意せず、そのまま書けば自然に非同期処理になる、というアプローチを採用している(Direct Style)。
// Oxの例 // 1秒ごとに"Hello!"を表示し、10秒で終わる import ox.* import scheduling.{repeat, Schedule} import scala.concurrent.duration.given object HelloEverySecond extends OxApp.Simple { def run(using Ox): Unit = { fork { repeat(Schedule.fixedInterval(1.second)) { println("Hello!") } } sleep(10.seconds) } }
他の言語の非同期処理プリミティブとOx
2025年現在、多くの言語で非同期処理用のメカニズムが(Goroutineなどの機構によって)搭載されているし、言語によっては使いやすい組込みの非同期処理プリミティブが(async/awaitキーワードなどの形で)提供されている。言語によっては専用のライブラリで対応するものもある(RustのTokioなど)。
Oxは専用のライブラリによってScala 3に使いやすい非同期処理プリミティブを提供するアプローチであり、立ち位置としてはTokioにあたる。ライブラリとしてよく作り込まれており、Scala 3の機能や型システムを活かしつつ、使いやすいAPIを、高級なものから低級なものまで用意してくれている。この点については、Goの非同期プリミティブがgoroutineやsync.WaitGroupといったかなり基礎的なものに限られるのと比較すると、かなり高級なプリミティブが豊富にある印象だ:
// 高級なAPI: par import ox.par def heavyFunction1() = ??? def heavyFunction2() = ??? par(heavyFunction1(), heavyFunction2()) // 並行実行する // 低級なAPI: fork import ox.{ fork, supervised } supervised { val f = fork { heavyFunction1() } val g = fork { heavyFunction2() } (f.join(), g.join()) }
ここでは高級なAPIの例としてparを挙げたが、この他にも「タイムアウト」「繰り返し実行」「バックプレッシャ」「リトライ」「レートリミット」といった、非同期実行に限らずよく使うプリミティブが用意されており、非常に簡単に使えるようになっている。また、これらの非同期プリミティブはScala 3の機能であるinline defを活用して書かれているため、実行時オーバーヘッドが非常に小さいという特徴も持つ。
さきほども説明したが、Oxの特異な点として「それ専用の型を提供しない」を挙げることができる。通常の同期処理と、Oxによる非同期処理とは、型レベルでは何も区別されない:
// 同期的なコード def f: Int = { // ... 42 } // Oxによる非同期処理を伴うコード def g: Int = supervised { // ... forkUser { 42 } }
上掲の例でも、関数fとgのシグネチャはいずれも() => Intであり、同期か非同期かを問わずに同じように扱えるようになっている。勝手に非同期処理されて、勝手に値が入ってくれるのだ。
このことは、他の同期的に書かれたライブラリとの統合を容易にしてくれるし、ライブラリ作者からしても、ユーザに特殊な型を強いることなくそのまま利用してもらえるというメリットがある。
対照的に、例えばECMAScriptのasync/awaitを利用する世界観では、非同期処理は必ずPromise型として扱われるし、通常の同期処理と混交することはできないため、二者を馴染ませるためには工夫が必要だ。ユーザも、ライブラリがPromiseを返すのか返さないのかで対応を分けなければならない。
ここまで他言語の非同期プリミティブとOxとを比較してきたが、Oxが使いやすさにフォーカスしていることがわかってもらえると思う。
Oxの軽量プロセス
Oxは軽量プロセス(他の言語で言うところのgoroutineやErlangのprocess)としてJVMのVirtual Threadを利用している。これはプラットフォームスレッドを分割して効率的に動作させることができるため、沢山作っても影響が少ない。例えば以下の例では100万プロセスを同時に起動しているが、特に影響なく動作する:
import ox.* val proc = () => sleep((math.random()*1000).milliseconds) par( Vector.fill(1_000_000)(proc) )
Oxを使ってみよう
Oxを利用するには、以下の条件が必要だ:
- Scala 3系であること
- JVMがJava 21以上である(Virtual Threadをサポートしている)こと
以下のように依存性を定義して、Oxを利用できるようにする:
// Scala CLIの場合 //> using dep com.softwaremill.ox::core:1.0.2
// sbtの場合 libraryDependencies += "com.softwaremill.ox" %% "core" % "1.0.2"
すると、ox.以下のパッケージに便利な機能が生える。ここからは、import ox.*している前提で話を進めていく。
軽量プロセスをスポーンする
軽量プロセスを起動するには、forkを利用する。Goで言うところのgoと同じような機能である。
supervised {
fork {
println("I am forked...")
}
}
forkを利用すると、ブロックで渡した部分が軽量プロセスとして起動する。
Scalaにおいてブロックは単純な式なので、単に式を渡すだけでもよい:
supervised(fork(println("I am forked...")))
そして、supervisedを抜けるタイミングで、強制的に軽量プロセスは中断させられる。つまり、supervisedより外側で軽量プロセスが生き続けることがないことが保証されている。 言い換えれば、supervisedは軽量プロセスのスポーンが許された区間である。また、軽量プロセスが永遠に生き続けるのを防ぐために、基本的にsupervisedは必ず置かなければならないことになっている。 軽量プロセスの寿命をスコープレベルで制御するという意味で、OxではこれをStructured Concurrencyと呼んでいる。
とはいえ、supervisedを抜けて軽量プロセスが止まってしまうのでは困るということもある。軽量プロセスが終わるまで待ってほしいこともあるだろう。そんなときに使えるのがforkUserだ。
forkUserは、処理が終了するまでsupervisedを待たせることができる:
import scala.concurrent.duration.given supervised { fork { scheduling.repeat(Schedule.fixedInterval(100.milliseconds)) { println("repeat 1") } } fork { sleep(50.milliseconds) scheduling.repeat(Schedule.fixedInterval(100.milliseconds)) { println("repeat 2") } } forkUser { sleep(2.seconds) println("timeout") } }
supervisedは、そのスコープが完了するタイミングで、全てのforkを強制的に中断させるが、その一方で全てのforkUserが完了するまで待機する、というように振る舞う。
上掲の例では、repeat 1という文字列とrepeat 2という文字列とが50msおきに表示され続け、2秒後にtimeoutと表示されてsupervisedスコープを脱出する。forkUserの効果で2秒待機する処理がsupervisedの中で待たされているためだ。forkUserのかわりにforkを実行すると即座にsupervisedは終わってしまう。forkUserのおかげで、GoにおけるWaitGroupのような機構を書かずに済んでいることに注目してほしい。
ちなみに、forkした軽量プロセスはデフォルトではキャンセルできないが、キャンセル可能なforkを起動するforkCancellableというものもある。ただしより多くのVirtual Threadを利用するため、コストは通常のforkよりも高い。
ここで利用したsleepは、FiniteDurationを受け取って、その間だけ軽量プロセスをスリープさせるプリミティブである。軽量プロセスレベルの操作なので、もちろんプラットフォームスレッドをブロックするようなことはない。
repeatについてはまた後程。
処理を並行実行する
先程紹介したforkシリーズは比較的低レベルの操作だった。より高級な操作として、複数の処理を並行実行してくれるparというのもある。
par(func1(), func2())
parは、2つの処理を並行実行させ、両方が終了するまで待機し、タプルを返す。また、parはSeq[() => A]も受け取ることができるので、これを利用して任意個の並行実行ができる。この場合はSeq[A]を返す。
処理を先勝ちで並行実行する
raceSuccessはparと同じシグネチャだが、どれか一つが最初に完了した時点で他の軽量プロセスを中断させ、最初に完了した結果のみを返す:
import scala.concurrent.duration.given def f1: String = { val howLong = math.random() * 1000 sleep(howLong.milliseconds) "f1 won!" } def f2: String = { val howLong = math.random() * 1000 sleep(howLong.milliseconds) "f2 won!" } println(raceSuccess(f1, f2)) // => f2 won!
他のバリエーションとして、raceResultやraceEitherが存在する。raceSuccessは両方が失敗した時点で最初に発せられた例外を再発出するのに対して、raceResultは、最初に成功または失敗した結果を返す(つまり、1つでも例外が出たらそこで止める)。raceEitherはraceのEitherを受け取る版だ。
ちなみに、uninterruptible {}を利用することで、その区間では停止しなくなるため、クリティカルな場面で使うことができる。
parやraceSuccessで記述できる場合は、forkなどを使わずこちらで書けばよいから便利だ。
処理をタイムアウトさせる
長時間にわたってオペレーションが完了しなかった場合に中断させたい、というのはよくある要求だ。timeoutを利用することで、FiniteDurationを受け取って指定時間後に処理を中断させることができる:
import scala.concurrent.duration.given timeout(10.seconds) { veryVeryLongAndHeavyComputationOrIOOperation() } // => TimeoutException!!
timeoutEitherやtimeoutOptionというバリエーションもあり、これは例外の代わりにLeftやNoneを返してくれる。
中断しても必ず処理を実行する
ところで、タイムアウトやraceSuccessに負けるなどした場合は軽量プロセスは停止してしまう。だが、中断するときも必ず特定の処理を実行したいということはある。これを実現するのがreleaseAfterScopeだ。
releaseAfterScopeを設定しておくと、そのスコープにある全てのforkが終了した後、そのスコープ自体が完了する前のタイミングで必ず特定の処理を呼び出してくれる:
import scala.concurrent.duration.given supervised { fork { println("starting daemon fork") releaseAfterScope(println("good bye!!")) sleep(10.seconds) println("daemon fork done") } forkUser { println("starting user fork") sleep(5.seconds) println("user fork done") } }
このコードを実行すると、以下のように表示される:
starting daemon fork starting user fork user fork done good bye!!
2つの軽量プロセスが起動するが、forkで起動したプロセスはsupervisedに待ってもらえない。つまり後者のforkUserが5秒後に完了すると同時に中断させられてしまうのだが、ここでreleaseAfterScopeの効果が発動し、good bye!!と表示されている。Goのdeferみたいな感じだ。
注意が必要なのは、releaseAfterScopeで設定したトリガーが起動するのは「スコープ内の自分を含めた他のすべてのforkが完了したとき」であって、「自分が中断されたとき」ではないということだ。
リソース管理のために使いたい場合は、後述するuseやuseInScopeのほうが使いやすい。
リソースを必ず解放したい
ファイルハンドルやでかいメモリ空間、TCPコネクションの類は、OS様から借りたら返さなければならないことになっている。しかししばしばそれは忘れられ、OS様からゲンコツを喰らってプロセスごとお取り潰しにされてしまう。ありがたいことに、Oxにはリソース管理のためのプリミティブが用意されている。
useを使うとリソースの取得と解放の処理をもとに、それが有効なスコープがもらえる:
supervised {
forkUser {
use(42, n => println(s"releasing $n...")) { n =>
println(s"Here, we can use $n!!")
// let it crash
throw Exception("boom")
}
}
}
これを実行すると、例外が発出されてクラッシュするにもかかわらず、リソース解放処理が自動的にトリガーされる:
Here, we can use 42!! releasing 42... java.lang.Exception: boom
複数のリソースを作るときは、useInScopeのほうが便利だ:
supervised {
forkUser {
val n = useInScope(42)(n => println(s"releasing $n!"))
val m = useInScope(666)(n => println(s"releasing $n!"))
println(s"Yo, I'm using $n and $m!!")
}
}
リソースは確保された逆順に解放される:
Yo, I'm using 42 and 666!! releasing 666! releasing 42!
コレクションに対する並行処理
Oxは直接的な並行処理とは別に、高級APIとして並行コレクション操作を提供している。これはコレクション自体を提供するのではなく、Scalaの標準コレクションに対する並行操作をOxが提供している、という建付だ。
例えばox.mapPar(filterParなどもあるので探してみてほしい)を利用すると、指定したconcurrencyでmapを並行実行できる:
import ox.mapPar val xs = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) println(xs.mapPar(4)(_ * 2)) // => Vector(0, 2, 4, 6, 8, 10, 12, 14, 16, 18)
ox.scheduling
ここまではOxの基本的な機能を紹介してきた。これまでに紹介した機能だけでも通り一遍のものは作れるようになるはずだが、実用上はもう少し組み上げられてそのまま使えるような、お惣菜のようなプリミティブが欲しい。ここからはそんなお惣菜パッケージを紹介していく。
最初に紹介するox.schedulingパッケージは、周期的な処理や一定期間待つといった処理を提供する。
スケジューリング
ox.schedulingの核となるパーツは、「つまるところそれをいつ実行するのか」という条件を表現した型、ox.scheduling.Scheduleだ。
Scheduleには色々なパターンが用意されており、「即時」「一定間隔」「配列で指定した間隔」「Exponential backoffを利用する」などのバラエティ豊かな指定が可能だ。
この型はox.schedulingの外でも割と頻繁に利用されるので、少し覚えておくといいかもしれない。
ox.scheduling.repeat
人生は繰り返しの連続・・・というわけで押さえておきたいのがOxによる繰り返し処理だ。繰り返し?そんなモン学部生の最初で習ったわ、と思うかもしれないが、単純なループではなくて、一定間隔で起動してくれる、という意味の繰り返しだ。ox.scheduling.repeatはox.scheduling.Scheduleを受け取り、そのスケジュールに従ってブロックを繰り返してくれる:
import ox.scheduling.* import scala.concurrent.duration.given repeat(Schedule.fixedInterval(1.seconds)) { println("Harder, Better, Faster, Stronger") }
Harder, Better, Faster, Stronger Harder, Better, Faster, Stronger Harder, Better, Faster, Stronger Harder, Better, Faster, Stronger Harder, Better, Faster, Stronger Harder, Better, Faster, Stronger Harder, Better, Faster, Stronger // ...
ox.resilience
さて、次のお惣菜だ。非同期処理、というかプログラミングにおいて常に存在し続ける課題がエラーハンドリングだ。そして往々にして、失敗したら何回かは頑張ってやり直してほしい・・・。うまいこと成功させてほしい・・・。そんな欲張りを叶えてくれるのがox.resilienceパッケージだ。
このパッケージは、単なる並行処理を越えて、失敗のハンドリングやコンポーネントの保護に特化した便利なプリミティブを用意している。
リトライ処理を行う
ox.resilience.retryは、渡した処理が例外を発出して失敗した場合に再試行する仕組みを与える。retryは再試行スケジュールの戦略のためにox.scheduling.Scheduleを受け取る。
import ox.scheduling.{ Schedule, Jitter } import ox.resilience.retry import scala.concurrent.duration.given def failableFunc() = ??? // たまに失敗してしまう retry( // Exponential Backoff を10msから開始し、最大幅は1秒にし、ジッタを設定する Schedule .exponentialBackoff(10.milliseconds) .maxInterval(1.seconds) .jitter(Jitter.Equal) )(failableFunc())
これを実行すると、失敗するたびに10msからどんどん指数的に待ち時間を伸ばしつつリトライしてくれる。.maxRetriesを利用すれば、最大試行回数の設定も可能だ。
レートリミットを行う
ox.resilience.RateLimiterは、レートリミッターを提供する。また、レートリミッターの仕様も細かく設定可能だ。外部APIといった、呼び出し頻度に制約があるものや、内部APIのうちそれほど負担を与えたくない部分の呼び出しに活用できる。
レートリミッターを定義した後は、.runBlockingや.runOrDropを利用して実際の処理を割当てる。両者の違いはレートリミットに抵触したときの挙動で、.runBlockingはリクエストできるようになるまで待つのに対して、.runOrDropはリクエストできるようになるまで全てのオペレーションを無視して捨ててしまう。
import ox.resilience.RateLimiter supervised { // ウィンドウが毎秒更新されるレートリミッター。ウィンドウあたり2件まで許容する val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second) def callApi(): String = "200" // レートリミッターを利用し、レートを越えているときはブロックさせる println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) }
これを実行すると200が1秒おきに2件ずつ表示され、レートリミッターが動作していることがわかる。
また、レートリミッターはsupervisedスコープに置く必要がある。これは、レートの更新のためにバックグラウンド動作が必要なためだ。
サーキットブレーカ
ox.resilience.CircuitBreakerは、オープン・ハーフオープン・クローズドの状態を持つサーキットブレーカを提供する。
こちらは設定がやや複雑なので割愛。
チャネル
チャネルといえばGoだが、別にチャネル自体はGoの専売特許ではない。宣言的に軽量プロセス間の通信手段を提供してくれるこのプリミティブは、Oxにもox.channels以下に用意されている。
チャネルをつくる
Oxで扱うことのできるチャネルは、バッファがないチャネル、バッファの長さ制限があるチャネル、バッファの長さ制限がないチャネルだ。いずれも、ox.channels.Channelから作ることができる:
import ox.channels.* val c = Channel.rendezvous[String] // 同期チャネル val d = Channel.buffered[String](10) // バッファつきチャネル、長さ10 val e = Channel.unlimited[String] // バッファつきチャネル、長さ無限 c.done() d.done() e.error(Exception("what the heck!"))
チャネルは開いた状態と閉じた状態との2状態を持っており、.done()で閉じることができる。また、閉じた状態のチャネルには正常とエラーの2状態があり、.done()を呼ぶと正常に閉じた状態になるが、.error()を呼ぶと異常終了を通知できるようになっている。
チャネルでおくる・もらう
チャネルの送受信のために特別なことは必要なく、ただ.send()や.receive()を呼べばよい。既に閉じたチャネルに対して送受信しようとすると例外が発出される。また、送受信のためにはsupervisedスコープが必要だ:
import ox.channels.* import ox.scheduling.* import scala.concurrent.duration.given supervised { val c = Channel.rendezvous[String] fork { repeat(Schedule.fixedInterval(500.millis)) { c.send("beep!") } } fork { forever { val signal = c.receive() println(signal) } } forkUser { sleep(10.seconds) println("done") } }
このコードは3つの軽量プロセスを起動する。すなわち、チャネルに500msごとに"beep!"を送り続ける君と、ずっとチャネルを待機してそれを表示し続ける君と、10秒待ってから終わる君だ。コードを実行すると、10秒間beep!と表示され続けてからdoneが表示され、終了する。
ちなみにsend()とreceive()との代わりに、sendOrClosed()とreceiveOrClosed()とを利用できる。これはチャネルが閉じていたときに例外を投げる代わりに「閉じてるよ〜」という値を返してくれるものだ。
チャネルをselectする
これもGoではおなじみだが、selectを利用することで複数のチャネルを束ねてどれか1つから受信したり、送信用のチャネルと受信用のチャネルとを束ねたりできる。
例えば、偶数に反応する軽量プロセスと奇数に反応する軽量プロセスとをselectで繋いでみよう:
// 中カッコがすごい数になるのでfewer braces記法で書いている import ox.channels.* import scala.concurrent.duration.given supervised: val in = Channel.rendezvous[Int] val inE = Channel.rendezvous[Int] val inO = Channel.rendezvous[Int] val outE = Channel.rendezvous[String] val outO = Channel.rendezvous[String] fork: forever: val n = in.receive() inE.send(n) inO.send(n) fork: forever: if inE.receive() % 2 == 0 then outE.send("Even") fork: forever: if inO.receive() % 2 == 1 then outO.send("Odd") fork: forever: val out = select(outE, outO) println(out) fork: (0 to 100).foreach: n => in.send(n) sleep(500.millis) forkUser: sleep(10.seconds)
このコードを実行すると、EvenとOddとが交互に0.5秒おきに表示されるはずだ。
詳細は割愛するが、公式ドキュメントに送信と受信を束ねる方法が記載されている:
ストリーミング処理
前項のox.channelsではチャネルをプリミティブとして軽量プロセス間の協調を実装した。チャネルが命令的なストリーミング処理のためのAPIだとするならば、ox.flowは宣言的なストリーミング処理のためのAPIだ。
前項の処理をox.flowで書き直すと、以下のようになる:
import ox.channels.* import ox.flow.* import scala.concurrent.duration.given supervised: val cE = Channel.rendezvous[Int] val evenFlow = cE .pipe(Flow.fromSource) .filter(_ % 2 == 0) .map(_ => "Even") val cO = Channel.rendezvous[Int] val oddFlow = cO .pipe(Flow.fromSource) .filter(_ % 2 == 1) .map(_ => "Odd") val xsFlow = Flow .range(0, 100, 1) .throttle(1, 500.millis) .map { x => cE.send(x) cO.send(x) } val resultFlow = evenFlow.merge(oddFlow).map(println) fork(xsFlow.runDrain()) fork(resultFlow.runDrain()) forkUser(sleep(10.seconds))
動作は同じだ。
ox.flow.Flowに便利なプリミティブが沢山用意されているので、気になる人は探してみてほしい。ファイル操作用のプリミティブや、テキスト操作のためのプリミティブが用意されている。
Oxいいんじゃないの
ざっとOxの機能を紹介してきたが、掘り下げられなかった機能もたくさんある。サーキットブレーカの設定については触れられなかったし、ox.flowの詳細はある程度割愛させてもらった(ScalaDocを読めばだいたい分かるからだ)。
全てを紹介することができなかったにせよ、Oxは非常に柔軟で書きやすい非同期処理ライブラリだということが分かってもらえたと思う。個人的には、Scalaで利用できる非同期処理ライブラリの中で一番好きだし、バランスが取れていると思う。
また、最近の(特に)Web開発では非同期処理といえばGoという風潮があるが、Oxはそこに肉薄する、またはそれを超えるポテンシャルを秘めている。Scala 3はパワフルでありつつ、TypeScript同様にすぐれた型システムの恩恵を受けている言語だ。OxはScala 3でやや心細かった非同期処理のパーツをよく埋めてくれたと思う。os-lib、Cask、sttp、macwireといった強力なお惣菜に、また一つレパートリーが加わった形だ。
OxはJVM限定のライブラリだが、非同期処理が必要になるような局面ではJVMのスピンアップ速度はそれほど重要ではないだろうし、気になるならGraalVMを利用できる。むしろ重要なのはメンテナビリティや習得のしやすさだ。自分は最新のOxの使い方を1日でおおむね覚えてしまった。Oxは単純で便利なのだ。この点は確実に他の非同期ライブラリよりも優れている。
また、Oxは依存性も少ない(今のところ1つの外部モジュールにしか依存していない)。アーティファクトのサイズを押さえることができるし、すぐに使えて力になってくれつつも、跡を濁しにくいライブラリであるように思う。
Oxをぜひ道具箱の中に入れておき、機会があれば使ってみてはいかがだろうか。そしてあわよくば、あなたがScala未経験者なら、Scalaの門を叩いてみてはいかがだろう。
あ、Scalaのコミュニティもやっています。