以下の内容はhttps://tutuz-tech.hatenablog.com/entry/2019/06/06/083903より取得しました。


CompletableFutureでJavaの非同期処理を試す

CompletableFutureの公式ドキュメントはこちら

CompletableFutureはタスク並列の記述に向いています。

まずはもっとも簡単な例から見てみます。以下のメソッドを使うのが手頃です。

  • supplyAsync
  • runAsync

非同期でタスクを実行して、結果を表示させてみます。

@Slf4j
public class AsyncTask {

  ExecutorService es = Executors.newCachedThreadPool();

  public void sendData() {
    CompletableFuture.supplyAsync(this::runTask, es).thenApply(this::sendMsg)
        .thenAccept(this::notify);
  }

  public String runTask() {
    try {
      int time = 1000 + new Random().nextInt(3000);
      log.debug("receiver start (time : {})", time);
      Thread.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    log.debug("receiver finish");
    return "findReceiver";
  }

  public String sendMsg(String obj) {
    log.debug("sendMsg: " + obj);
    return "sendMsg";
  }

  public void notify(String msg) {
    log.debug("notify: " + msg);
  }

  public static void main(String[] args) {
    log.debug("start");
    AsyncTask task = new AsyncTask();
    IntStream.range(0, 2).forEach(i -> task.sendData());
    log.debug("finish");
  }
}

結果
タスクがmainスレッドとは別に非同期で実行されていることが分かります。

23:42:04.265 [main] DEBUG com.tsuji.sample.AsyncTask - start
23:42:04.308 [main] DEBUG com.tsuji.sample.AsyncTask - finish
23:42:04.307 [pool-1-thread-1] DEBUG com.tsuji.sample.AsyncTask - receiver start (time : 3705)
23:42:04.308 [pool-1-thread-2] DEBUG com.tsuji.sample.AsyncTask - receiver start (time : 2665)
23:42:06.973 [pool-1-thread-2] DEBUG com.tsuji.sample.AsyncTask - receiver finish
23:42:06.973 [pool-1-thread-2] DEBUG com.tsuji.sample.AsyncTask - sendMsg: findReceiver
23:42:06.973 [pool-1-thread-2] DEBUG com.tsuji.sample.AsyncTask - notify: sendMsg
23:42:08.013 [pool-1-thread-1] DEBUG com.tsuji.sample.AsyncTask - receiver finish
23:42:08.013 [pool-1-thread-1] DEBUG com.tsuji.sample.AsyncTask - sendMsg: findReceiver
23:42:08.013 [pool-1-thread-1] DEBUG com.tsuji.sample.AsyncTask - notify: sendMsg

CompletableFutureのいくつかのメソッドを利用しているので、確認します。

supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
指定されたエグゼキュータで実行されているタスクが指定されたサプライヤを呼び出して取得した値を使用して非同期的に完了する新しいCompletableFutureを返します。
supplierは引数なしで何か値を返す関数といえます。 型を非同期実行するオブジェクトを生成します。オブジェクトを生成するCompletableFutureのメソッドは他にもいくつかあります。

CompletableFutureオブジェクトを生成するメソッド(抜粋)

メソッド 説明
supplyAsync Supplier<U>を非同期実行し、完了しているCompletableFuture<U>を生成
runAsync Runnableを非同期実行し、完了しているCompletableFutureを生成
completedFuture <U>型の引数をもらって、完了しているCompletableFuture<U>を生成
thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
このステージが正常に完了したときに、このステージの結果を指定された関数への引数に設定して実行される新しいCompletionStageを返します。
Futureの結果に対して、Functionを実行する...という理解がしっくりきました。
thenAccept
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
このステージが正常に完了したときに、このステージの結果を指定されたアクションへの引数に設定して実行される新しいCompletionStageを返します。
Futureの結果に対して、Consumerを実行する...という理解がしっくりきました。Consumerなので結果は戻りません。

次に自前でCompletableFutureのインスタンスを生成して、非同期処理を通知する仕組みを実装してみます。なんとなくSpringBootで作ってみます。

@Slf4j
@Service
public class MyTask {

  @Autowired
  Executor executor;

  @Async
  public CompletableFuture<String> runTask() {
    try {
      int time = 1000 + new Random().nextInt(3000);
      log.info("sub task start (time : {})", time);
      Thread.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    log.info("sub task finish");

    return CompletableFuture.completedFuture("ok");
  }

  @Async
  public CompletableFuture<String> runFailTask() {
    try {
      int time = 1000 + new Random().nextInt(3000);
      log.info("sub fail task start (time : {})", time);
      Thread.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    log.info("sub fail task finish");

    throw new RuntimeException("my exception");
  }
}
@EnableAsync
@SpringBootApplication
public class MySpringApplication {

  @Autowired
  MyTask task;

  public static void main(String[] args) {
    SpringApplication.run(MySpringApplication.class, args);
  }

  @Bean
  public Executor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(3);
    executor.initialize();
    return executor;
  }

  @Service
  class Invoker implements CommandLineRunner {

    Logger log = LoggerFactory.getLogger(Invoker.class);

    @Async
    @Override
    public void run(String... args) throws Exception {
      task.runTask().whenComplete((ret, ex) -> {
        if (ex == null) {
          // 成功した場合
          log.info("result={}", ret);
        } else {
          // 失敗した場合
          log.error("err={}", ex);
        }
      });

      task.runFailTask().whenComplete((ret, ex) -> {
        if (ex == null) {
          log.info("result={}", ret);
        } else {
          log.error("err={}", ex);
        }
      });
    }
  }
}

結果
whenComplete で想定通りハンドリングできていることが分かります。 CompletableFuture.completedFuture を用いることで自前でCompletableFutureを生成することができました。これを用いればいろいろな非同期処理を便利に扱うことができそうです。

...
2019-06-06 00:58:00.530  INFO 15468 --- [           main] com.tsuji.sample.MySpringApplication     : Started MySpringApplication in 1.237 seconds (JVM running for 2.225)
2019-06-06 00:58:00.538  INFO 15468 --- [ taskExecutor-3] com.tsuji.sample.MyTask                  : sub fail task start (time : 2604)
2019-06-06 00:58:00.538  INFO 15468 --- [ taskExecutor-2] com.tsuji.sample.MyTask                  : sub task start (time : 3564)
2019-06-06 00:58:03.144  INFO 15468 --- [ taskExecutor-3] com.tsuji.sample.MyTask                  : sub fail task finish
2019-06-06 00:58:03.145 ERROR 15468 --- [ taskExecutor-3] c.t.sample.MySpringApplication$Invoker   : err=java.util.concurrent.CompletionException: java.lang.RuntimeException: my exception
2019-06-06 00:58:04.103  INFO 15468 --- [ taskExecutor-2] com.tsuji.sample.MyTask                  : sub task finish
2019-06-06 00:58:04.103  INFO 15468 --- [ taskExecutor-2] c.t.sample.MySpringApplication$Invoker   : result=ok
2019-06-06 00:58:10.237  INFO 15468 --- [on(3)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested.
2019-06-06 00:58:10.239  INFO 15468 --- [on(3)-127.0.0.1] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'taskExecutor'
whenComplete
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
whenCompleteメソッドでは、結果に関係なくアクションのインジェクションが可能ですが、それ以外についてはその完了の結果が保持されます

他にもいろいろ使い方はありそうですが、とりあえずこんなところです。

参考




以上の内容はhttps://tutuz-tech.hatenablog.com/entry/2019/06/06/083903より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14