これは、なにをしたくて書いたもの?
Spring Frameworkには、タスク実行とスケジューリングの機能があります。
今回は、タスク実行にフォーカスして見ていきたいと思います。
具体的には、TaskExecutorとThreadPoolTaskExecutor、@Asyncアノテーションを扱います。
Task Execution
Spring FrameworkのTask Execution…タスクの実行に関するドキュメントは、こちらです。
Spring Frameworkでは、JavaのExecutorを抽象化したTaskExecutorを提供しています。
The Spring TaskExecutor Abstraction
Java SE環境であってもJava EE環境であっても、こちらを使うことでタスク実行に関する実装の詳細が隠蔽されます。
TaskExecutorはExecutorを継承したインターフェースであり、その実装はSpring Frameworkが提供しています。
Spring Frameworkの利用者自身が、TaskExecutorの実装を作成することはほとんどないでしょう。
TaskExecutor (Spring Framework 5.3.6 API)
TaskExecutorの実装には、以下の種類があります。
SyncTaskExecutor… タスクの実行は別スレッドで行うものの、タスクを非同期では実行しない実装。テストなど、マルチスレッドが不要な状況下で使用するSimpleAsyncTaskExecutor… 呼び出しごとに、新しいスレッドを作成して実行する実装。同時に実行できるタスクの数は制限できるConcurrentTaskExecutor…java.util.concurrent.Executorのアダプター。通常は、ThreadPoolTaskExecutorを使用すればよいThreadPoolTaskExecutor… 最も一般的な実装。java.util.concurrent.ThreadPoolExecutorをTaskExecutorとしてラップしている。他のExecutorを使いたい場合は、ConcurrentTaskExecutorの利用を検討するWorkManagerTaskExecutor… CommonJ WorkManager(WebSphere、WebLogic)をバックエンドに使用する実装DefaultManagedTaskExecutor… Java EE環境におけるManagedExecutorServiceをバックエンドに使用する実装
いずれにせよ、TaskExecutorはJavaのExecutorインターフェースの拡張なので、使い方も同じになります。
Executor (Java SE 11 & JDK 11 )
よく使われそうなThreadPoolTaskExecutorはThreadPoolExecutorをラップしているので、こちらも参照すると
よいでしょう。
ThreadPoolExecutor (Java SE 11 & JDK 11 )
ThreadPoolTaskExecutor (Spring Framework 5.3.6 API)
Spring Bootを使った場合は、ThreadPoolTaskExecutorが自動構成されます。
TaskExecutorの使用例はこちら。
また、@Asyncはスケジュール実行に関する機能なのですが、今回使ってみます。
@EnableAsyncを使って有効化したうえで、処理を行うBeanのメソッドに@Asyncアノテーションを付与すると、
指定された(またはデフォルトの)TaskExecutorを使って非同期実行が行われます。
今回は、TaskExecutorとThreadPoolTaskExecutor、@Asyncアノテーションを使っていきます。
環境
今回の環境は、こちらです。
$ java --version openjdk 11.0.10 2021-01-19 OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.20.04) OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.10, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-72-generic", arch: "amd64", family: "unix"
Spring Bootは2.4.5を使い、依存するSpring Frameworkは5.3.6となります。
Spring Bootプロジェクトの作成
Spring Initializrを使って、Spring Bootプロジェクトを作成します。
今回のアプリケーションはWebではなく、CommandLineRunnerで確認するくらいで良いかな、と思ったので依存関係は
特に指定していません。
$ curl -s https://start.spring.io/starter.tgz \ -d bootVersion=2.4.5 \ -d javaVersion=11 \ -d name=task-execution \ -d groupId=org.littlewings \ -d artifactId=task-execution \ -d version=0.0.1-SNAPSHOT \ -d packageName=org.littlewings.spring.task \ -d baseDir=task-execution | tar zxvf - $ cd task-execution
dependenciesをなにも指定しない場合は、依存関係がこれくらいのpom.xmlになります。
<properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
起動クラスは、こちらで。
src/main/java/org/littlewings/spring/task/App.java
package org.littlewings.spring.task; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication //@EnableAsync public class App { public static void main(String[] args) { SpringApplication.run(App.class, args); } }
コメントアウトしている@EnableAsyncアノテーションは、後述します。
実行は、以下のコマンドで行っているものとします。
$ mvn spring-boot:run
あと、CommandLineRunnerインターフェースの実装クラスは試したい機能ごとに作っていくのですが、そのまま増やすと
全部実行していってしまうので、紹介が終わったら以下のように@Componentアノテーションをコメントアウトしていく
ものとします。
// @Component public class SimpleRunner implements CommandLineRunner {
まずは使ってみる
では、使っていってみます。
こちらを参考に
こんなクラスを作成。
src/main/java/org/littlewings/spring/task/SimpleRunner.java
package org.littlewings.spring.task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @Component public class SimpleRunner implements CommandLineRunner { Logger logger = LoggerFactory.getLogger(SimpleRunner.class); TaskExecutor taskExecutor; public SimpleRunner(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } @Override public void run(String... args) throws Exception { taskExecutor.execute(() -> { logger.info("Hello World!!"); logger.info("TaskExecutor name = {}", taskExecutor.getClass().getName()); }); taskExecutor.execute(() -> logger.info("Oops!!")); ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) taskExecutor; logger.info("core pool size = {}, max pool size = {}, keep-alive seconds = {}", threadPoolTaskExecutor.getCorePoolSize(), threadPoolTaskExecutor.getMaxPoolSize(), threadPoolTaskExecutor.getKeepAliveSeconds()); } }
DIする型は、TaskExecutorインターフェースでも大丈夫です。
TaskExecutor taskExecutor;
public SimpleRunner(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
TaskExecutor#executeメソッドに、Runnableを渡すことでタスクを実行できます。今回は、ログ出力をしているだけです。
TaskExecutorの実装クラス名も出力しています。
taskExecutor.execute(() -> {
logger.info("Hello World!!");
logger.info("TaskExecutor name = {}", taskExecutor.getClass().getName());
});
taskExecutor.execute(() -> logger.info("Oops!!"));
TaskExecutorインターフェースができるのは、これだけです。
TaskExecutor (Spring Framework 5.3.6 API)
タスクからの戻り値も取得できません。
実体はThreadPoolTaskExecutorなので、キャストして設定を見てみましょう。
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) taskExecutor;
logger.info("core pool size = {}, max pool size = {}, keep-alive seconds = {}", threadPoolTaskExecutor.getCorePoolSize(), threadPoolTaskExecutor.getMaxPoolSize(), threadPoolTaskExecutor.getKeepAliveSeconds());
実行結果。
2021-04-25 22:36:55.048 INFO 43283 --- [ task-1] o.littlewings.spring.task.SimpleRunner : Hello World!! 2021-04-25 22:36:55.049 INFO 43283 --- [ task-2] o.littlewings.spring.task.SimpleRunner : Oops!! 2021-04-25 22:36:55.049 INFO 43283 --- [ task-1] o.littlewings.spring.task.SimpleRunner : TaskExecutor name = org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 2021-04-25 22:36:55.048 INFO 43283 --- [ main] o.littlewings.spring.task.SimpleRunner : core pool size = 8, max pool size = 2147483647, keep-alive seconds = 60 2021-04-25 22:37:55.052 INFO 43283 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
ログのスレッド名を見ると、task-となっていて、別スレッドで実行されていることがわかります。
使用されているThreadPoolTaskExecutorがAutoConfigureされているのは、こちらですね。
プロパティ設定は、こちらを。
デフォルトでは8スレッドをコアサイズとして持ち、最大スレッド数はInteger.MAX_VALUEまで広がるスレッドプールが
構成されます。
ところでこのプログラム、実行するとしばらく終了しません。
停止まで1分かかっています。
2021-04-25 22:36:55.048 INFO 43283 --- [ main] o.littlewings.spring.task.SimpleRunner : core pool size = 8, max pool size = 2147483647, keep-alive seconds = 60 2021-04-25 22:37:55.052 INFO 43283 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
spring.task.execution.pool.allow-core-thread-timeoutがデフォルトでtrueになっていてタスクを実行したスレッドはタイムアウト
する設定になっているのですが、spring.task.execution.pool.keep-aliveがデフォルトで60秒なので、この間は待ち続けます。
ちょっと長いので、今回は3秒にしておきましょう。
src/main/resources/application.properties
spring.task.execution.pool.keep-alive=3s
今度は、3秒で終了するようになりました。
2021-04-25 22:44:36.363 INFO 43800 --- [ task-1] o.littlewings.spring.task.SimpleRunner : Hello World!! 2021-04-25 22:44:36.364 INFO 43800 --- [ task-2] o.littlewings.spring.task.SimpleRunner : Oops!! 2021-04-25 22:44:36.364 INFO 43800 --- [ task-1] o.littlewings.spring.task.SimpleRunner : TaskExecutor name = org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 2021-04-25 22:44:36.364 INFO 43800 --- [ main] o.littlewings.spring.task.SimpleRunner : core pool size = 8, max pool size = 2147483647, keep-alive seconds = 3 2021-04-25 22:44:39.369 INFO 43800 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
ThreadPoolTaskExecutorを使う
先ほどはTaskExecutorインターフェースのままタスクを実行しましたが、TaskExecutor#executeだとRunnableしか
実行できないのでちょっと不便です…。
もう、ThreadPoolTaskExecutorでDIしていいのではないのでしょうか?という気分になります。
というわけで、そのように書いてみました。
src/main/java/org/littlewings/spring/task/ThreadPoolTaskExecutorRunner.java
package org.littlewings.spring.task; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @Component public class ThreadPoolTaskExecutorRunner implements CommandLineRunner { Logger logger = LoggerFactory.getLogger(ThreadPoolTaskExecutorRunner.class); ThreadPoolTaskExecutor taskExecutor; public ThreadPoolTaskExecutorRunner(ThreadPoolTaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } @Override public void run(String... args) throws Exception { List<Future<?>> futures = new ArrayList<>(); futures.add(taskExecutor.submit(() -> { logger.info("calc task start"); try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { // ignore } return 1 + 2; })); futures.add(taskExecutor.submit(() -> { logger.info("message task start"); try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { // ignore } return "Hello World!!"; })); for (Future<?> future : futures) { logger.info("future get = {}", future.get()); } } }
やっていることは、Callableを使って結果を返すようにしてFutureを受け取り、
futures.add(taskExecutor.submit(() -> {
logger.info("calc task start");
try {
TimeUnit.SECONDS.sleep(3L);
} catch (InterruptedException e) {
// ignore
}
return 1 + 2;
}));
futures.add(taskExecutor.submit(() -> {
logger.info("message task start");
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
// ignore
}
return "Hello World!!";
}));
最後に待ち合わせているだけです。
for (Future<?> future : futures) { logger.info("future get = {}", future.get()); }
実行結果。
2021-04-25 22:53:40.708 INFO 44289 --- [ task-1] o.l.s.task.ThreadPoolTaskExecutorRunner : calc task start 2021-04-25 22:53:40.709 INFO 44289 --- [ task-2] o.l.s.task.ThreadPoolTaskExecutorRunner : message task start 2021-04-25 22:53:43.710 INFO 44289 --- [ main] o.l.s.task.ThreadPoolTaskExecutorRunner : future get = 3 2021-04-25 22:53:45.710 INFO 44289 --- [ main] o.l.s.task.ThreadPoolTaskExecutorRunner : future get = Hello World!!
ログを見ても、別スレッドで実行されていることが確認できますし、結果も取得できています。
@Asyncを使って非同期実行する
次は、@Asyncを使って非同期実行してみましょう。@Asyncアノテーションは、スケジュール実行側の機能です。
最初に、うまくいかない例を書いてみます。
src/main/java/org/littlewings/spring/task/MessageService.java
package org.littlewings.spring.task; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; @Service public class MessageService { Logger logger = LoggerFactory.getLogger(MessageService.class); @Async public String getMessage() { logger.info("get message start"); Thread.dumpStack(); try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { // ignore } return "Hello World"; } }
こんな感じで、非同期実行したいメソッドに@Asyncアノテーションを付与します。スタックトレースの様子も見たいので、
Thread#dumpStackも含めておきました。
@Async public String getMessage() { logger.info("get message start"); Thread.dumpStack(); try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { // ignore } return "Hello World"; }
こちらを使うクラスを作成。このパターンでは、TaskExecutorやThreadPoolTaskExecutorは、最後までソースコードに現れません。
src/main/java/org/littlewings/spring/task/UseAsyncRunner.java
package org.littlewings.spring.task; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class UseAsyncRunner implements CommandLineRunner { Logger logger = LoggerFactory.getLogger(UseAsyncRunner.class); MessageService messageService; public UseAsyncRunner(MessageService messageService) { this.messageService = messageService; } @Override public void run(String... args) throws Exception { String message = messageService.getMessage(); logger.info("return message = {}", message); } }
実行してログをよく見ると、単に同じスレッドで動いているだけです。
2021-04-25 23:06:59.317 INFO 45078 --- [ main] o.l.spring.task.MessageService : get message start
java.lang.Exception: Stack trace
at java.base/java.lang.Thread.dumpStack(Thread.java:1388)
at org.littlewings.spring.task.MessageService.getMessage(MessageService.java:22)
at org.littlewings.spring.task.UseAsyncRunner.run(UseAsyncRunner.java:22)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:819)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:803)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:346)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1340)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329)
at org.littlewings.spring.task.App.main(App.java:11)
2021-04-25 23:07:02.318 INFO 45078 --- [ main] o.l.spring.task.UseAsyncRunner : return message = Hello World
この機能を使うには、@EnableAsyncアノテーションを使う必要があるからです。
これで、@Asyncアノテーションが機能するようになります。
@SpringBootApplication @EnableAsync public class App {
再度実行。
2021-04-25 23:08:43.523 INFO 45212 --- [ main] o.l.spring.task.UseAsyncRunner : return message = null
2021-04-25 23:08:43.534 INFO 45212 --- [ task-1] o.l.spring.task.MessageService : get message start
java.lang.Exception: Stack trace
at java.base/java.lang.Thread.dumpStack(Thread.java:1388)
at org.littlewings.spring.task.MessageService.getMessage(MessageService.java:22)
at org.littlewings.spring.task.MessageService$$FastClassBySpringCGLIB$$ace3701c.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
今度は、別スレッドで動くようになりました。スタックトレースを見ると、インターセプターが入っていますね。
ですが、メソッドの戻り値が取得できていません。
2021-04-25 23:08:43.523 INFO 45212 --- [ main] o.l.spring.task.UseAsyncRunner : return message = null
ドキュメントをちゃんと読むと、値を返す場合はFutureを使う必要があるようです。
Even methods that return a value can be invoked asynchronously. However, such methods are required to have a Future-typed return value. This still provides the benefit of asynchronous execution so that the caller can perform other tasks prior to calling get() on that Future.
ドキュメントを見ていると戻り値がvoidの例ばかりですが、voidの場合だと結果を待たない(待てない)非同期実行になります。
メソッドの戻り値が不要であれば、voidにして実行してもよいでしょう。
今回は戻り値を使いたいので、メソッドの戻り値がFutureとなるように変更してみます。
src/main/java/org/littlewings/spring/task/MessageService.java
package org.littlewings.spring.task; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; @Service public class MessageService { Logger logger = LoggerFactory.getLogger(MessageService.class); @Async public Future<String> getMessage() { logger.info("get message start"); Thread.dumpStack(); try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { // ignore } // return "Hello World"; return new AsyncResult<>("Hello World"); } }
ドキュメントがvoidの例ばかりだったので、Futureを返すにはどうしたら…?という気分になりましたが、AsyncResultを
使うのが良さそうです。
AsyncResult (Spring Framework 5.3.6 API)
呼び出し元も変更。
src/main/java/org/littlewings/spring/task/UseAsyncRunner.java
package org.littlewings.spring.task; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class UseAsyncRunner implements CommandLineRunner { Logger logger = LoggerFactory.getLogger(UseAsyncRunner.class); MessageService messageService; public UseAsyncRunner(MessageService messageService) { this.messageService = messageService; } @Override public void run(String... args) throws Exception { Future<String> message = messageService.getMessage(); logger.info("return message = {}", message.get()); } }
では、実行。
2021-04-25 23:43:14.929 INFO 47028 --- [ task-1] o.l.spring.task.MessageService : get message start
java.lang.Exception: Stack trace
at java.base/java.lang.Thread.dumpStack(Thread.java:1388)
at org.littlewings.spring.task.MessageService.getMessage(MessageService.java:37)
at org.littlewings.spring.task.MessageService$$FastClassBySpringCGLIB$$ace3701c.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2021-04-25 23:43:17.931 INFO 47028 --- [ main] o.l.spring.task.UseAsyncRunner : return message = Hello World
OKですね。
ちなみに、Future以外にもSpringが提供するListenableFuture、そしてJava 8以降のCompletableFutureも戻り値として
使えます。
ドキュメントに記載がありますし、
@Async methods may not only declare a regular java.util.concurrent.Future return type but also Spring’s org.springframework.util.concurrent.ListenableFuture or, as of Spring 4.2, JDK 8’s java.util.concurrent.CompletableFuture, for richer interaction with the asynchronous task and for immediate composition with further processing steps.
このあたりを見ればわかります。
呼び出し元は、こちら。メソッドをCallableでラップして実行します。
この時使われるTaskExecutorは、これまで使ってきたSpring Bootでデフォルト構成されたThreadPoolTaskExecutorです。
また、今回は扱いませんが、例外処理についてはこちらのようです。
Exception Management with @Async
自分でThreadPoolTaskExecutorのBeanを作成する
最後に、自分でThreadPoolTaskExecutorのBeanを作成してみましょう。
こんな感じで、2つのThreadPoolTaskExecutorのBeanを登録。
src/main/java/org/littlewings/spring/task/MyTaskExecutorConfiguration.java
package org.littlewings.spring.task; import java.time.Duration; import org.springframework.boot.task.TaskExecutorBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration public class MyTaskExecutorConfiguration { @Bean("largePoolTaskExecutor") public ThreadPoolTaskExecutor largePoolTaskExecutor() { TaskExecutorBuilder builder = new TaskExecutorBuilder(); return builder .corePoolSize(16) .maxPoolSize(16) .keepAlive(Duration.ofSeconds(2L)) .queueCapacity(Integer.MAX_VALUE) .threadNamePrefix("large-pool-task-") .allowCoreThreadTimeOut(true) .build(); } @Bean("smallPoolTaskExecutor") public ThreadPoolTaskExecutor smallPoolTaskExecutor() { TaskExecutorBuilder builder = new TaskExecutorBuilder(); return builder .corePoolSize(2) .maxPoolSize(2) .keepAlive(Duration.ofSeconds(2L)) .queueCapacity(Integer.MAX_VALUE) .threadNamePrefix("small-pool-task-") .allowCoreThreadTimeOut(true) .build(); } }
ThreadPoolTaskExecutorの作成には、TaskExecutorBuilderを使うのが良いでしょう。
The auto-configured TaskExecutorBuilder allows you to easily create instances that reproduce what the auto-configuration does by default.
Both a TaskExecutorBuilder bean and a TaskSchedulerBuilder bean are made available in the context if a custom executor or scheduler needs to be created.
TaskExecutorBuilder (Spring Boot 2.4.5 API)
使い方は、Spring Boot自身が参考になります。
今回は、サイズ固定のスレッドプールを2つ用意しました。
@Bean("largePoolTaskExecutor") public ThreadPoolTaskExecutor largePoolTaskExecutor() { TaskExecutorBuilder builder = new TaskExecutorBuilder(); return builder .corePoolSize(16) .maxPoolSize(16) .keepAlive(Duration.ofSeconds(2L)) .queueCapacity(Integer.MAX_VALUE) .threadNamePrefix("large-pool-task-") .allowCoreThreadTimeOut(true) .build(); } @Bean("smallPoolTaskExecutor") public ThreadPoolTaskExecutor smallPoolTaskExecutor() { TaskExecutorBuilder builder = new TaskExecutorBuilder(); return builder .corePoolSize(2) .maxPoolSize(2) .keepAlive(Duration.ofSeconds(2L)) .queueCapacity(Integer.MAX_VALUE) .threadNamePrefix("small-pool-task-") .allowCoreThreadTimeOut(true) .build(); }
定義したThreadPoolTaskExecutorを使う側のクラス。
src/main/java/org/littlewings/spring/task/CustomTaskExecutorRunner.java
package org.littlewings.spring.task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @Component public class CustomTaskExecutorRunner implements CommandLineRunner { Logger logger = LoggerFactory.getLogger(CustomTaskExecutorRunner.class); ThreadPoolTaskExecutor largePoolTaskExecutor; ThreadPoolTaskExecutor smallPoolTaskExecutor; public CustomTaskExecutorRunner(@Qualifier("largePoolTaskExecutor") ThreadPoolTaskExecutor taskExecutor1, @Qualifier("smallPoolTaskExecutor") ThreadPoolTaskExecutor taskExecutor2) { this.largePoolTaskExecutor = taskExecutor1; this.smallPoolTaskExecutor = taskExecutor2; } @Override public void run(String... args) throws Exception { largePoolTaskExecutor.execute(() -> logger.info("hello")); smallPoolTaskExecutor.execute(() -> logger.info("world")); logger.info("pool prefix = {}, core pool size = {}, max pool size = {}, keep-alive seconds = {}", largePoolTaskExecutor.getThreadNamePrefix(), largePoolTaskExecutor.getCorePoolSize(), largePoolTaskExecutor.getMaxPoolSize(), largePoolTaskExecutor.getKeepAliveSeconds()); logger.info("pool prefix = {}, core pool size = {}, max pool size = {}, keep-alive seconds = {}", smallPoolTaskExecutor.getThreadNamePrefix(), smallPoolTaskExecutor.getCorePoolSize(), smallPoolTaskExecutor.getMaxPoolSize(), smallPoolTaskExecutor.getKeepAliveSeconds()); } }
ThreadPoolTaskExecutorの区別は、@Qualifierアノテーションで行うことにしました。
public CustomTaskExecutorRunner(@Qualifier("largePoolTaskExecutor") ThreadPoolTaskExecutor taskExecutor1, @Qualifier("smallPoolTaskExecutor") ThreadPoolTaskExecutor taskExecutor2) { this.largePoolTaskExecutor = taskExecutor1; this.smallPoolTaskExecutor = taskExecutor2; }
今回のThreadPoolTaskExecutorの使い道は、単にメッセージのログ出力と、設定値の確認のためのログ出力くらいですけどね。
実行結果。
2021-04-26 00:07:16.757 INFO 48670 --- [rge-pool-task-1] o.l.s.task.CustomTaskExecutorRunner : hello 2021-04-26 00:07:16.758 INFO 48670 --- [all-pool-task-1] o.l.s.task.CustomTaskExecutorRunner : world 2021-04-26 00:07:16.758 INFO 48670 --- [ main] o.l.s.task.CustomTaskExecutorRunner : pool prefix = large-pool-task-, core pool size = 16, max pool size = 16, keep-alive seconds = 2 2021-04-26 00:07:16.758 INFO 48670 --- [ main] o.l.s.task.CustomTaskExecutorRunner : pool prefix = small-pool-task-, core pool size = 2, max pool size = 2, keep-alive seconds = 2
設定した内容が反映されているようです。
OKですね。
このように自分で定義したTaskExecutorを@Asyncアノテーションで指定するには、@Asyncアノテーションのvalueに
指定すればよいみたいです。
Executor Qualification with @Async
今回は、扱いませんが。
まとめ
Spring FrameworkのTask Executionを、TaskExecutorとThreadPoolTaskExecutor、@Asyncアノテーションに絞って
使ってみました。
JavaのExecutorServiceとそう変わらないのだろうと思っていたのですが、それが正解でもあり、固有のこともあった感じですね。
勉強になりました、と。