注意:R2DBCはまだ本番では使わないようにね(2018-11-18現在)
昨日R2DBCを初めて触ってみて「ふーん、いまいちよく分かってないなぁ」と思いながら寝たんだけど。
朝起きたら「あ、そういうことか」ってなったので、考えの整理。
## 全体概要

実装で気にするのはConnectionFactoryを生成する部分くらいで、あとはr2dbc-spiを見れば良さそう。
r2dbc-spiは各RDBMSに対して実装しやすいようにシンプルなインターフェースになってるのでこれを直接使うとしんどい。利用者としてはこれを直接触るのではなく、r2dbc-clientみたいに人にとって使いやすい形にラッピングしてくれるやつを使う。
r2dbc-spiはSpring Frameworkとは全然関係なくて、reactive-streamsだけに依存しててPublisherを返す。r2dbc-clientやspring-data-r2dbcはMonoやFluxを使ってる。
## 今日の予定
PostgresqlConnectionFactoryを見てみる- r2dbc-spiを見てみる
r2dbc-clientとspring-data-r2dbcはまた今度見る。
## 1. PostgresqlConnectionFactoryを見てみる
PostgresqlConnectionConfigurationを受け取ってインスタンスを生成。
public PostgresqlConnectionFactory(PostgresqlConnectionConfiguration configuration) { this(Mono.defer(() -> { Objects.requireNonNull(configuration, "configuration must not be null"); return ReactorNettyClient.connect(configuration.getHost(), configuration.getPort()).cast(Client.class); }), configuration); } PostgresqlConnectionFactory(Mono<? extends Client> clientFactory, PostgresqlConnectionConfiguration configuration) { this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory must not be null"); this.configuration = Objects.requireNonNull(configuration, "configuration must not be null"); }
ClientFactoryがMonoになってて、Connectionの生成部分ではMonoを返してる。
@Override public Mono<PostgresqlConnection> create() { return this.clientFactory .delayUntil(client -> StartupMessageFlow .exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase().orElse(null), this.configuration.getUsername()) .handle(PostgresqlServerErrorException::handleErrorResponse)) .map(client -> new PostgresqlConnection(client, new DefaultCodecs(client.getByteBufAllocator()), DefaultPortalNameSupplier.INSTANCE, new IndefiniteStatementCache(client))); }
へー。こんな風に書くのか(まだリアクティブ周りよく分かってない。勉強しなきゃ。
あ、そういえば、r2dbc-postgresqlはPublisherじゃなくてMonoとFlux使ってるのか。ふーん。
### 使い方
使い方はr2dbc-clientのREADMEにこう書いてる。
PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration.builder()
.host("<host>")
.database("<database>")
.username("<username>")
.password("<password>")
.build();
R2dbc r2dbc = new R2dbc(new PostgresqlConnectionFactory(configuration));
このR2dbcはr2dbc-client側のクラスなんだけど、ConnectionFactoryを受け取って中で必要に応じてコネクションを生成して処理するんだろうな。
## 2. r2dbc-spiを見てみる
https://github.com/r2dbc/r2dbc-spi/tree/v1.0.0.M5/r2dbc-spi/src/main/java/io/r2dbc/spi
シンプル。だいたいこんな感じか。

ConnectionもStatementもPublisherを返してる。
それとこの図には書いてないけどStatementと同じ並びにBatchインターフェースがある。
### Result
Resultには2つのメソッドがあって
Publisher<Integer> getRowsUpdated();
<T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> f);
getRowsUpdatedは更新された行数をPublisherで返す。
mapはRow (とそのメタデータ)を受け取って、自分の好きな型に変換してPublisherで返す。
### 使い方
直接使うとこんな感じになる。r2dbc-postgresqlがMonoとFluxを使ってるからこんな感じに書けるけど、もし純粋にr2dbc-spiに対して書くならPublisherを使わなきゃだから、もうちょっとごちゃっとしそう?
private Flux<String> sample() { var connectionFactory = getPostgresqlConnectionFactory(); return connectionFactory.create() .flatMapMany(connection -> connection.createStatement("SELECT city FROM weather") .execute() .flatMap(result -> result.map((row, metadata) -> row.get("city", String.class) ) ) ); }
PostgresqlConnectionFactoryのcreate()で生成したMono<PostgresqlConnection>から結果のFluxを返すためにflatMapManyを使用。
コネクションからステートメントを生成して実行。
その結果のFlux<PostgresqlResult>に対するflatMapでRowからStringを生成してFlux<String>返してる。
最初にも書いた通り、実際にはこれを直接使うんじゃなくて、r2dbc-clientのようなラッパーを使う。
ふむふむ。ちょっと理解が進んだ。