CompletableFutureの公式ドキュメントはこちら
CompletableFutureはタスク並列の記述に向いています。
まずはもっとも簡単な例から見てみます。以下のメソッドを使うのが手頃です。
- supplyAsync
- runAsync
非同期でタスクを実行して、結果を表示させてみます。
@Slf4j public class AsyncTask { ExecutorService es = Executors.newCachedThreadPool(); public void sendData() { CompletableFuture.supplyAsync(this::runTask, es).thenApply(this::sendMsg) .thenAccept(this::notify); } public String runTask() { try { int time = 1000 + new Random().nextInt(3000); log.debug("receiver start (time : {})", time); Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("receiver finish"); return "findReceiver"; } public String sendMsg(String obj) { log.debug("sendMsg: " + obj); return "sendMsg"; } public void notify(String msg) { log.debug("notify: " + msg); } public static void main(String[] args) { log.debug("start"); AsyncTask task = new AsyncTask(); IntStream.range(0, 2).forEach(i -> task.sendData()); log.debug("finish"); } }
結果
タスクがmainスレッドとは別に非同期で実行されていることが分かります。
23:42:04.265 [main] DEBUG com.tsuji.sample.AsyncTask - start 23:42:04.308 [main] DEBUG com.tsuji.sample.AsyncTask - finish 23:42:04.307 [pool-1-thread-1] DEBUG com.tsuji.sample.AsyncTask - receiver start (time : 3705) 23:42:04.308 [pool-1-thread-2] DEBUG com.tsuji.sample.AsyncTask - receiver start (time : 2665) 23:42:06.973 [pool-1-thread-2] DEBUG com.tsuji.sample.AsyncTask - receiver finish 23:42:06.973 [pool-1-thread-2] DEBUG com.tsuji.sample.AsyncTask - sendMsg: findReceiver 23:42:06.973 [pool-1-thread-2] DEBUG com.tsuji.sample.AsyncTask - notify: sendMsg 23:42:08.013 [pool-1-thread-1] DEBUG com.tsuji.sample.AsyncTask - receiver finish 23:42:08.013 [pool-1-thread-1] DEBUG com.tsuji.sample.AsyncTask - sendMsg: findReceiver 23:42:08.013 [pool-1-thread-1] DEBUG com.tsuji.sample.AsyncTask - notify: sendMsg
CompletableFutureのいくつかのメソッドを利用しているので、確認します。
supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
指定されたエグゼキュータで実行されているタスクが指定されたサプライヤを呼び出して取得した値を使用して非同期的に完了する新しいCompletableFutureを返します。supplierは引数なしで何か値を返す関数といえます。 型を非同期実行するオブジェクトを生成します。オブジェクトを生成するCompletableFutureのメソッドは他にもいくつかあります。
CompletableFutureオブジェクトを生成するメソッド(抜粋)
| メソッド | 説明 |
|---|---|
| supplyAsync | Supplier<U>を非同期実行し、完了しているCompletableFuture<U>を生成 |
| runAsync | Runnableを非同期実行し、完了しているCompletableFutureを生成 |
| completedFuture | <U>型の引数をもらって、完了しているCompletableFuture<U>を生成 |
thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
このステージが正常に完了したときに、このステージの結果を指定された関数への引数に設定して実行される新しいCompletionStageを返します。Futureの結果に対して、Functionを実行する...という理解がしっくりきました。
thenAccept
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
このステージが正常に完了したときに、このステージの結果を指定されたアクションへの引数に設定して実行される新しいCompletionStageを返します。Futureの結果に対して、Consumerを実行する...という理解がしっくりきました。Consumerなので結果は戻りません。
次に自前でCompletableFutureのインスタンスを生成して、非同期処理を通知する仕組みを実装してみます。なんとなくSpringBootで作ってみます。
@Slf4j @Service public class MyTask { @Autowired Executor executor; @Async public CompletableFuture<String> runTask() { try { int time = 1000 + new Random().nextInt(3000); log.info("sub task start (time : {})", time); Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } log.info("sub task finish"); return CompletableFuture.completedFuture("ok"); } @Async public CompletableFuture<String> runFailTask() { try { int time = 1000 + new Random().nextInt(3000); log.info("sub fail task start (time : {})", time); Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } log.info("sub fail task finish"); throw new RuntimeException("my exception"); } }
@EnableAsync @SpringBootApplication public class MySpringApplication { @Autowired MyTask task; public static void main(String[] args) { SpringApplication.run(MySpringApplication.class, args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(3); executor.initialize(); return executor; } @Service class Invoker implements CommandLineRunner { Logger log = LoggerFactory.getLogger(Invoker.class); @Async @Override public void run(String... args) throws Exception { task.runTask().whenComplete((ret, ex) -> { if (ex == null) { // 成功した場合 log.info("result={}", ret); } else { // 失敗した場合 log.error("err={}", ex); } }); task.runFailTask().whenComplete((ret, ex) -> { if (ex == null) { log.info("result={}", ret); } else { log.error("err={}", ex); } }); } } }
結果
whenComplete で想定通りハンドリングできていることが分かります。 CompletableFuture.completedFuture を用いることで自前でCompletableFutureを生成することができました。これを用いればいろいろな非同期処理を便利に扱うことができそうです。
... 2019-06-06 00:58:00.530 INFO 15468 --- [ main] com.tsuji.sample.MySpringApplication : Started MySpringApplication in 1.237 seconds (JVM running for 2.225) 2019-06-06 00:58:00.538 INFO 15468 --- [ taskExecutor-3] com.tsuji.sample.MyTask : sub fail task start (time : 2604) 2019-06-06 00:58:00.538 INFO 15468 --- [ taskExecutor-2] com.tsuji.sample.MyTask : sub task start (time : 3564) 2019-06-06 00:58:03.144 INFO 15468 --- [ taskExecutor-3] com.tsuji.sample.MyTask : sub fail task finish 2019-06-06 00:58:03.145 ERROR 15468 --- [ taskExecutor-3] c.t.sample.MySpringApplication$Invoker : err=java.util.concurrent.CompletionException: java.lang.RuntimeException: my exception 2019-06-06 00:58:04.103 INFO 15468 --- [ taskExecutor-2] com.tsuji.sample.MyTask : sub task finish 2019-06-06 00:58:04.103 INFO 15468 --- [ taskExecutor-2] c.t.sample.MySpringApplication$Invoker : result=ok 2019-06-06 00:58:10.237 INFO 15468 --- [on(3)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested. 2019-06-06 00:58:10.239 INFO 15468 --- [on(3)-127.0.0.1] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'taskExecutor'
whenComplete
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
whenCompleteメソッドでは、結果に関係なくアクションのインジェクションが可能ですが、それ以外についてはその完了の結果が保持されます
他にもいろいろ使い方はありそうですが、とりあえずこんなところです。