これは、なにをしたくて書いたもの?
Java 21で正式版になった、JEP 444(Virtual Threads)を試しておきたいなということで。
スレッドに関するAPIも変わっているようなので、こちらも合わせて。
なお、スレッドダンプの取得やHTTPサーバー/クライアントを使った例、そしてpinning(ピン留め)が発生した時にスタックトレースを
出力する方法は別エントリーに書いています。
Virtual Threadsを使ってHTTPサーバー/クライアントを書いて、スレッドまわりの動きを確認してみる(スレッドダンプの取得付き) - CLOVER🍀
JEP 444(Virtual Threads)のpinning(ピン留め)をシステムプロパティjdk.tracePinnedThreadsによるスタックトレースの出力で確認する - CLOVER🍀
JEP 444: Virtual Threads
Virtual ThreadsはJEP 444で導入されました。
Java 21のドキュメントでは、こちらに記載があります。
主な内容
JEP 444の内容を見ていってみます。ちょっとわかりにくいですが、Virtual Threadsの特定のスレッドインスタンスを指す場合は
「仮想スレッド」と表記するようにしたつもりです。
- 目標
- 目標としていないもの
Virtual Threadsが導入されたモチベーションとしては、
- 既存のスレッドはOSスレッドのラッパーとして実装されているため(プラットフォームスレッド)使用可能なスレッドが制限されており、
またコストも高いため、スレッド数がリクエストをスレッドに割り当てて処理を行うサーバーアプリケーションの場合、CPUなどのリソースを使い切る前にスレッド数が制限要因になってしまう - 非同期スタイルはリクエストを異なるスレッドがインターリーブ方式で扱うため、スタックトレースが扱いづらい、プログラミングスタイルがJavaプラットフォームと相容れないという問題がある
- Virtual Threadsを導入することで、ブロッキングIO操作を呼び出すと仮想スレッドを再開するまで一時停止することができるようになり、高いスループットを実現できるようになる
といったところのようです。IOでブロックしている間に、別の仮想スレッドを動かせる、というものみたいですね。
また、仮想スレッドはプールする必要はないそうです。
デバッガー、JDK Flight Recorderは仮想スレッドを扱うことができ、スレッドダンプも新しい形式になったようです。
動作に関する記述を少し見ていきます。
- プロセッサーへの割当
- OSスレッドとして実装されるプラットフォームスレッドの場合はOSのスケジューラーに依存するが、Virtual Threadsの場合はJDK独自のスケジューラーに依存し、仮想スレッドをプラットフォームスレッドに割り当てる
- 仮想スレッドをプラットフォームスレッドに割り当てることをマウントと呼び、実行終了または停止中にはアンマウントされる
- この状態でプラットフォームスレッドは従来どおりOSによってスケジューリングされる
- Virtual Threadsのスケジューラー
Thread#currentThread- 仮想スレッドで実行されているものであれば、仮想スレッドを返す
- マウント/アンマウント
- 仮想スレッドは、IOまたはブロック操作などによりブロックされるとアンマウントされ、その後再度マウントされ再開される
synchronizedメソッドまたはブロック内の処理はアンマウントできない- 代わりに
ReentrantLockを使うこと - 将来的に改善される可能性がある
- 代わりに
nativeメソッドまたはForeign Functionを実行中はアンマウントできない
- ピン留め
APIの変更
JEP 444を見る限りは、Virtual Threadsの導入にあたり新しいクラスが追加されたわけではなさそうですね。
Thread.Builderが微妙なところです。
いくつか挙げておきましょう。
- スレッドの作成
Thread.Builder、Thread#ofVirtual、Thread#ofPlatformを使うThread#startVirtualThreadで仮想スレッドを作成、開始できる
- 仮想スレッドかどうかの判定
Thread#isVirtual
Thread#getAllStackTracesで返すのは、すべてのプラットフォームスレッドのMap- API上の仮想スレッドとプラットフォームスレッドの違い
Threadのパブリックなコンストラクターでは、仮想スレッドを作成できない- 仮想スレッドは常にデーモンスレッドで、
Thread#setDaemonで非デーモンスレッドに変更できない - 仮想スレッドの優先度は常に固定で
Thread.NORM_PRIORITY- 将来的にこの制限を再検討する可能性はある
- 仮想スレッドはスレッドグループのアクティブなメンバーではなく、仮想スレッドに対して
Thread#getThreadGroupを呼び出すとVirtualThreadsという名前のスレッドグループが返される - 仮想スレッドに対するSecurityManagerの権限はない
- 仮想スレッドは
ThreadLocalを利用可能- ただし
ThreadLocalは多数のスレッドが作成される仮想スレッドからのアクセスを想定すると重いので、Scoped Valuesが導入予定
- ただし
java.util.concurrent.LockSupportはVirtual ThreadsをサポートするLockSupportを使用するAPI(Lock、Semaphore、BlockingQueueなど)が仮想スレッドから呼び出された時に機能するようになった
- タスクごとにスレッドを作成する
Executors#newThreadPerTaskExecutorとExecutors#newVirtualThreadPerTaskExecutorによりスレッドプールを使う既存のコードからの移行と相互運用が可能に - ネットワーク
java.netおよびjava.nio.channelsパッケージのネットワークAPIの実装はVirtual Threadsをサポートし、たとえば仮想スレッド上で読み取りをブロックする操作を行うとプラットフォームスレッドをアンマウント、開放するjava.net.Socket、ServerSocket、DatagramSocketは仮想スレッドで呼び出された時にインタラプト可能になり、インタラプトおよびキャンセルができるようになった- ソケット上でブロックされている仮想スレッドを中断すると、スレッドが解除されソケットもクローズされる
java.iojava.ioパッケージはsynchronizedが多用されており、Virtual Threadsの導入のために変更が必要だったBufferedInputStream、BufferedOutputStream、BufferedReader、BufferedWriter、PrintStream、PrintWriterはsynchronizedではなく明示的なロックを使うようになった- これらのクラスをサブクラス化しても問題ない
InputStreamReaderおよびOutputStreamWriterから利用されるデコーダー、エンコーダーはInputStreamReader、OutputStreamWriterと同じロックを使うようになった
- JMX
ThreadMXBeanがサポートするのはプラットフォームスレッドのみThreadMXBean#findDeadlockedThreadsが検出するのはデッドロック状態のプラットフォームスレッドであり、仮想スレッドは検出されない
JNI、デバッグ、JDK Flight Recorderについては今回は省略します。
あとは、Java 21のドキュメントのこのあたりを見ていくのもよいと思います。書かれている内容のイメージが付きやすいように、
含まれているセクションのタイトルも記載しておきます。
- コア・ライブラリ / 並行処理 / 仮想スレッド
- プラットフォーム・スレッド
- 仮想スレッドとは
- 仮想スレッドを使用する理由
- コア・ライブラリ / 並行処理 / 仮想スレッド / 仮想スレッドの作成と実行
ThreadクラスおよびThread.Builderインタフェースを使用した仮想スレッドの作成Executors.newVirtualThreadPerTaskExecutor()メソッドを使用した仮想スレッドの作成と実行- マルチスレッド・クライアント・サーバーの例
- コア・ライブラリ / 並行処理 / 仮想スレッド / 仮想スレッドのスケジュールおよび固定された仮想スレッド
- コア・ライブラリ / 並行処理 / 仮想スレッド / 仮想スレッドのデバッグ
- 仮想スレッドのJava Flight Recorderイベント
- jcmdスレッド・ダンプでの仮想スレッドの表示
- コア・ライブラリ / 並行処理 / 仮想スレッド / 仮想スレッド: 採用ガイド
とりあえず、今回はスレッドまわりのAPIを簡単に試してみたいと思います。
スレッドダンプの取得や、HTTPサーバー/クライアントを使った仮想スレッドのマウント/アンマウントあたりの切り替え、
pinning(ピン留め)が発生した時にスタックトレースを出力する方法は別のエントリーで確認しています。
Virtual Threadsを使ってHTTPサーバー/クライアントを書いて、スレッドまわりの動きを確認してみる(スレッドダンプの取得付き) - CLOVER🍀
JEP 444(Virtual Threads)のpinning(ピン留め)をシステムプロパティjdk.tracePinnedThreadsによるスタックトレースの出力で確認する - CLOVER🍀
環境
今回の環境はこちら。
$ java --version openjdk 21.0.1 2023-10-17 OpenJDK Runtime Environment (build 21.0.1+12-Ubuntu-222.04) OpenJDK 64-Bit Server VM (build 21.0.1+12-Ubuntu-222.04, mixed mode, sharing) $ mvn --version Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 21.0.1, vendor: Private Build, runtime: /usr/lib/jvm/java-21-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.15.0-91-generic", arch: "amd64", family: "unix"
準備
Maven依存関係などはこちら。
<properties> <maven.compiler.source>21</maven.compiler.source> <maven.compiler.target>21</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.10.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.24.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>3.1.2</version> </plugin> </plugins> </build>
確認は、テストコードで行うことにします。
新しいスレッドに関するAPIを使う
それでは、今回は新しいスレッドに関するAPIを使っていってみましょう。
プラットフォームスレッド
まずは既存のプラットフォームスレッドから。このあたりを直接使うことはないと思いますが。
src/test/java/org/littlewings/virtualthreads/ThreadApiForPlatformThreadTest.java
package org.littlewings.virtualthreads; import org.junit.jupiter.api.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; class ThreadApiForPlatformThreadTest { @Test void createSimply() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread thread = Thread.ofPlatform().start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromConstructor() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread thread = new Thread(latch::countDown); thread.start(); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilder() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofPlatform(); builder.name("my-thread"); Thread thread = builder.start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilderUnstarted() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofPlatform(); builder.name("my-thread"); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getThreadGroup().getName()).isEqualTo("main"); thread.start(); assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } @Test void registerThreadGroup() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread thread = Thread.ofPlatform().group(new ThreadGroup("my-group")).start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } }
以下で、スレッドを作成して開始できます。
Thread thread = Thread.ofPlatform().start(latch::countDown);
Threadのコンストラクタを使ってスレッドを作成すると、プラットフォームスレッドになります。
Thread thread = new Thread(latch::countDown); thread.start(); ... assertThat(thread.isVirtual()).isFalse();
Thread.Builderというものが導入されたということでしたが、これはThread#ofPlatform(Virtual Threadsの場合はThread#ofVirtual)で
返ってくるものがThread.Builderのインスタンスみたいですね。
Thread.Builder builder = Thread.ofPlatform();
builder.name("my-thread");
Thread thread = builder.start(latch::countDown);
Thread.Builder#unstartedで、まだ開始していないスレッドを作成することもできます。
Thread.Builder builder = Thread.ofPlatform();
builder.name("my-thread");
Thread thread = builder.unstarted(latch::countDown);
assertThat(thread.getState()).isEqualTo(Thread.State.NEW);
assertThat(thread.getThreadGroup().getName()).isEqualTo("main");
thread.start();
assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE);
Virtual Threads
続いて、Virtual Threads。こちらも、これらのAPIを直接使うことはないと思います。
src/test/java/org/littlewings/virtualthreads/ThreadApiForVirtualThreadTest.java
package org.littlewings.virtualthreads; import org.junit.jupiter.api.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; class ThreadApiForVirtualThreadTest { @Test void createSimply() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread thread = Thread.ofVirtual().start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilder() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); builder.name("my-thread"); Thread thread = builder.start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilderUnstarted() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); builder.name("my-thread"); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); thread.start(); assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); } /* @Test void registerThreadGroup() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); // Thread.Builder.OfVirtualにgroupがない Thread thread = Thread.ofVirtual().group(new ThreadGroup("my-group")).start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } */ @Test void currentThreadIsVirtualThread() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.ofVirtual().name("my-virtual-thread").start(() -> { Thread currentThread = Thread.currentThread(); assertThat(currentThread.getName()).isEqualTo("my-virtual-thread"); assertThat(currentThread.isVirtual()).isTrue(); latch.countDown(); }); latch.await(); } @Test void cannotSetNonDaemon() { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.isDaemon()).isTrue(); assertThatThrownBy(() -> thread.setDaemon(false)) .isExactlyInstanceOf(IllegalArgumentException.class) .hasMessage("'false' not legal for virtual threads"); } @Test void cannotChangePriority() { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); thread.setPriority(Thread.MAX_PRIORITY); assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); } @Test void useThreadLocal() throws InterruptedException { CountDownLatch setLatch = new CountDownLatch(3); CountDownLatch finishLatch = new CountDownLatch(3); ThreadLocal<String> names = new ThreadLocal<>(); Thread thread1 = Thread.ofVirtual().unstarted(() -> { names.set("thread1"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread1"); names.remove(); finishLatch.countDown(); }); Thread thread2 = Thread.ofVirtual().unstarted(() -> { names.set("thread2"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread2"); names.remove(); finishLatch.countDown(); }); Thread thread3 = Thread.ofVirtual().unstarted(() -> { names.set("thread3"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread3"); names.remove(); finishLatch.countDown(); }); thread1.start(); thread2.start(); thread3.start(); finishLatch.await(); } }
Thread#ofVirtualで作成したスレッドのインスタンスは、仮想スレッドになります。
Thread thread = Thread.ofVirtual().start(latch::countDown);
assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads");
latch.await();
TimeUnit.MILLISECONDS.sleep(300L);
assertThat(thread.isVirtual()).isTrue();
この時のスレッドグループの名前は、確かにVirtualThreadsですね。
Thread.Builderの使い方は、プラットフォームスレッドと大差ないですね。
@Test void createFromBuilder() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); builder.name("my-thread"); Thread thread = builder.start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); } @Test void createFromBuilderUnstarted() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); builder.name("my-thread"); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getThreadGroup().getName()).isEqualTo("VirtualThreads"); thread.start(); assertThat(thread.getState()).isEqualTo(Thread.State.RUNNABLE); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isTrue(); assertThat(thread.getThreadGroup()).isNull(); }
一方で、仮想スレッドに対してはスレッドグループは設定できません。Thread#ofVirtualで返されるThread.Builderではスレッドグループを
指定できません。
/* @Test void registerThreadGroup() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); // Thread.Builder.OfVirtualにgroupがない Thread thread = Thread.ofVirtual().group(new ThreadGroup("my-group")).start(latch::countDown); assertThat(thread.getThreadGroup().getName()).isEqualTo("my-group"); latch.await(); TimeUnit.MILLISECONDS.sleep(300L); assertThat(thread.isVirtual()).isFalse(); assertThat(thread.getThreadGroup()).isNull(); } */
Thread.Builderと言っていますが、厳密にはプラットフォームスレッドの時はThread.Builder.OfPlatform、仮想スレッドの時は
Thread.Builder.OfVirtualが返ってきています。
Thread#currentThreadは、現在の仮想スレッドのインスタンスを返します。
@Test void currentThreadIsVirtualThread() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Thread.ofVirtual().name("my-virtual-thread").start(() -> { Thread currentThread = Thread.currentThread(); assertThat(currentThread.getName()).isEqualTo("my-virtual-thread"); assertThat(currentThread.isVirtual()).isTrue(); latch.countDown(); }); latch.await(); }
非デーモンスレッドに変更しようとすると、例外がスローされます。
@Test void cannotSetNonDaemon() { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.isDaemon()).isTrue(); assertThatThrownBy(() -> thread.setDaemon(false)) .isExactlyInstanceOf(IllegalArgumentException.class) .hasMessage("'false' not legal for virtual threads"); }
実装箇所。
優先度を変更しようとすると無視されます。
@Test void cannotChangePriority() { CountDownLatch latch = new CountDownLatch(1); Thread.Builder builder = Thread.ofVirtual(); Thread thread = builder.unstarted(latch::countDown); assertThat(thread.getState()).isEqualTo(Thread.State.NEW); assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); thread.setPriority(Thread.MAX_PRIORITY); assertThat(thread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); }
実装箇所。
ThreadLocalも使えますね。
@Test void useThreadLocal() throws InterruptedException { CountDownLatch setLatch = new CountDownLatch(3); CountDownLatch finishLatch = new CountDownLatch(3); ThreadLocal<String> names = new ThreadLocal<>(); Thread thread1 = Thread.ofVirtual().unstarted(() -> { names.set("thread1"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread1"); names.remove(); finishLatch.countDown(); }); Thread thread2 = Thread.ofVirtual().unstarted(() -> { names.set("thread2"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread2"); names.remove(); finishLatch.countDown(); }); Thread thread3 = Thread.ofVirtual().unstarted(() -> { names.set("thread3"); setLatch.countDown(); try { setLatch.await(); } catch (InterruptedException e) { // ignore } assertThat(names.get()).isEqualTo("thread3"); names.remove(); finishLatch.countDown(); }); thread1.start(); thread2.start(); thread3.start(); finishLatch.await(); }
Executors#newVirtualThreadPerTaskExecutorを使う
最後に、Executors#newVirtualThreadPerTaskExecutorを使ってみます。通常、Virtual Threadsを使う時はこちらではないかなと
思います。
src/test/java/org/littlewings/virtualthreads/ExecutorForVirtualThreadTest.java
package org.littlewings.virtualthreads; import org.junit.jupiter.api.Test; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; class ExecutorForVirtualThreadTest { @Test void virtualThreadPerTask() throws ExecutionException, InterruptedException { AtomicInteger sum = new AtomicInteger(); try (ExecutorService es = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<Integer>> futures = IntStream .rangeClosed(1, 10000) .mapToObj(i -> es.submit(() -> sum.addAndGet(i))) .toList(); for (Future<Integer> f : futures) { f.get(); } } assertThat(sum.get()).isEqualTo(50005000); } }
Executors#newVirtualThreadPerTaskExecutorで、タスクごとに仮想スレッドのインスタンスが生成されます。
背後で使われているのは、Thread#ofVirtual#factoryですね。
public static ExecutorService newVirtualThreadPerTaskExecutor() { ThreadFactory factory = Thread.ofVirtual().factory(); return newThreadPerTaskExecutor(factory); }
ところで、Java 19でExecutorServiceがAutoCloseableになったようです。
try (ExecutorService es = Executors.newVirtualThreadPerTaskExecutor()) {
これ、どういう実装になっているか気になるところですね。
まずExecutorService#shutdownを呼び出しシャットダウン処理を開始(新規のタスクは受付拒否)して、実行中のタスクがある場合は
1日待つようです。
@Override default void close() { boolean terminated = isTerminated(); if (!terminated) { shutdown(); boolean interrupted = false; while (!terminated) { try { terminated = awaitTermination(1L, TimeUnit.DAYS); } catch (InterruptedException e) { if (!interrupted) { shutdownNow(); interrupted = true; } } } if (interrupted) { Thread.currentThread().interrupt(); } } }
今回は、このくらいにしておきます。
オマケ
Virtual ThreadsのスケジューラーはForkJoinPoolであり、そのパラメーターはシステムプロパティで調整可能だということでしたが、
それはこのあたりのようです。
@SuppressWarnings("removal") private static ForkJoinPool createDefaultScheduler() { ForkJoinWorkerThreadFactory factory = pool -> { PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); return AccessController.doPrivileged(pa); }; PrivilegedAction<ForkJoinPool> pa = () -> { int parallelism, maxPoolSize, minRunnable; String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); if (parallelismValue != null) { parallelism = Integer.parseInt(parallelismValue); } else { parallelism = Runtime.getRuntime().availableProcessors(); } if (maxPoolSizeValue != null) { maxPoolSize = Integer.parseInt(maxPoolSizeValue); parallelism = Integer.min(parallelism, maxPoolSize); } else { maxPoolSize = Integer.max(parallelism, 256); } if (minRunnableValue != null) { minRunnable = Integer.parseInt(minRunnableValue); } else { minRunnable = Integer.max(parallelism / 2, 1); } Thread.UncaughtExceptionHandler handler = (t, e) -> { }; boolean asyncMode = true; // FIFO return new ForkJoinPool(parallelism, factory, handler, asyncMode, 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); }; return AccessController.doPrivileged(pa); }
これを見ると、デフォルトの並列度は確かに使用できるプロセッサー数ですね。スレッドプールの最大値はデフォルトでは
並列度と256の大きい方、最小値は並列度と1の大きい方のようです。
おわりに
Java 21で正式版になったJEP 444(Virtual Threads)を試してみました。
今回はとりあえずスレッドに関するAPIまわりを見ていってみましたが、スレッドダンプなどはまた気になるところなので、それは
別の機会に見てみようと思います。