CompletableFuture<T> から Mono<T> に関しては、 Mono#fromFuture(CompletableFuture) というメソッドがあるのだが、 Future<T> からは Mono<T> への変換メソッドはないし、もっと言えば Future<T> から CompletableFuture<T> への変換メソッドもない。
最初は次のような変換コードを書いていたが、あまり良さそうに見えない。
final Future<T> future = ...; final CompletableFuture<T> completableFuture = CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch(final InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); final Mono<T> mono = Mono.fromFuture(completableFuture);
いろいろ調べてたら、 MonoProcessor<T> というものがあって、これは Subscriber<T> を実装しているので、 onNext(T) というメソッドから値を渡せるし、 onError(Throwable) というメソッドから例外を伝播させられるようだ。
これを使うと次のようになる。
final Future<T> future = ...; final MonoProcessor<T> processor = MonoProcessor.create(); try { processor.onNext(future.get()); } catch(final InterruptedException | ExecutionException e) { processor.onError(e); } final Mono<T> mono = processor;
非常にシンプルにできた。
2018/05/06 0:12 追記
@making さんから、上のコードがブロックしていると指摘を受けた上で、次のようにするとよいとアドバイスをうけた。
final Future<T> future = ...; final Mono<T> mono = Mono.fromCallable(future::get) .subscribeOn(Schedulers.fromExecutore(executor));
これ、いずれの方法も、future.getをここで呼ぶとblockするのでよくないです。Futureはnon-blockingではないので、non-blockingな文脈で使いたいなら、threadをdispatchしないといけないです。こんな感じ(画像)。logを出して、onNextのスレッド名を出すとわかりやすいです。 pic.twitter.com/oxjgBRCyTn
— Toshiaki Maki (@making) 2018年4月29日