jOOQで組み立てたクエリをConnectionFactoryへ直発行する仕組みを作ったのでまとめます。
なお、これは事情が有って作ったものであり、本来一切推奨できないコードであることはご承知おき下さい。
コード全体は以下の通りです。
基本的にはSelectクエリでの利用を想定しています。
import io.r2dbc.spi.Connection import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Readable import io.r2dbc.spi.Result import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.asFlow import org.jooq.AttachableQueryPart import org.jooq.Field import org.jooq.JSONB import org.jooq.Select import org.jooq.conf.ParamType import org.reactivestreams.Publisher import org.springframework.r2dbc.connection.ConnectionHolder import org.springframework.transaction.NoTransactionException import org.springframework.transaction.reactive.TransactionSynchronizationManager import reactor.core.publisher.Flux // コネクションに対するSQL発行 private fun Connection.query(sql: AttachableQueryPart): Publisher<out Result> = createStatement(sql.getSQL(ParamType.INLINED)).execute() // 呼び出し用 fun <T> ConnectionFactory.read(sql: Select<*>, mapper: (ContextualReadable) -> T): Flow<T> { // トランザクションが開始されていればそのコネクションから、非開始なら新しく取得したコネクションからクエリを発行する val flux = TransactionSynchronizationManager .forCurrentTransaction() .flatMapMany { manager -> val con = (manager.getResource(this) as ConnectionHolder).connection con.query(sql) } .onErrorResume(NoTransactionException::class.java) { // closeが漏れるとコネクションリークするので注意、書き方はr2dbc-pool推奨より // https://github.com/r2dbc/r2dbc-pool?tab=readme-ov-file#getting-started Flux.usingWhen(this.create(), { it.query(sql) }, Connection::close) } // クエリで指定されている名前と列番号の対応を記録 val context = sql.select.withIndex().associate { it.value to it.index } return flux .flatMap { res -> res.map { row -> mapper(ContextualReadable(row, context)) } } .asFlow() } // 名前検索だとDSL.fieldで宣言されたプロパティがひっかけられなかったため、select上に指定されたindexで読み出している class ContextualReadable(val readable: Readable, val context: Map<Field<*>, Int>) { // 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外 fun findIndex(field: Field<*>): Int = context[field] ?: (context.entries.singleOrNull { it.key.name == field.name }?.value) ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません") // 最低限よく使いそうな型に対しての変換定義 @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST") fun <T> read(index: Int, clazz: Class<T>): T = when { clazz.isEnum -> readable.get(index, String::class.java)?.let { value -> (clazz as Class<Enum<*>>).enumConstants.find { it.name == value } ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした") } clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) } else -> readable.get(index, clazz) } as T /** * Recordからの読み出しを簡単化するための関数のReadable版 * * jOOQ同様の読み出しができるとは限らないため注意! * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと */ inline fun <reified T> read(field: Field<*>): T { val index = findIndex(field) val clazz = T::class.java return read(index, clazz) } /** * fieldと同じ型で読み出してよい場合 */ inline operator fun <reified T> get(field: Field<T>): T = read(field) }
利用する様子は以下のようになります。
val cfi: ConnectionFactory = ... val sql: Select<*> = ... val result: Flow<Dto> = cfi.read(sql) { it -> Dto( it.read(FOO.C1), // カラムの読み出し方1 it[FOO.C2], // カラムの読み出し方2 ) }
以下解説します。
クエリ発行部
諸事情でパラメータ埋め込みされた状態でのクエリ発行が必要だったため、ParamType.INLINEDを指定しています。
運用の仕方によってはクエリログ経由での情報漏洩など有りうるので注意が必要です。
それ以外は通常のクエリ発行と同様になっています。
// コネクションに対するSQL発行 private fun Connection.query(sql: AttachableQueryPart): Publisher<out Result> = createStatement(sql.getSQL(ParamType.INLINED)).execute()
呼び出し用インターフェース(+ コネクション取得)部
この部分では、コネクション取得と結果マッピングに向けた準備、及び実クエリ発行・結果マッピング呼び出しを行っています。
結果マッピングに向けた準備部分に関する説明は次セクションで行います。
// 呼び出し用 fun <T> ConnectionFactory.read(sql: Select<*>, mapper: (ContextualReadable) -> T): Flow<T> { // トランザクションが開始されていればそのコネクションから、非開始なら新しく取得したコネクションからクエリを発行する val flux = TransactionSynchronizationManager .forCurrentTransaction() .flatMapMany { manager -> val con = (manager.getResource(this) as ConnectionHolder).connection con.query(sql) } .onErrorResume(NoTransactionException::class.java) { // closeが漏れるとコネクションリークするので注意、書き方はr2dbc-pool推奨より // https://github.com/r2dbc/r2dbc-pool?tab=readme-ov-file#getting-started Flux.usingWhen(this.create(), { it.query(sql) }, Connection::close) } // クエリで指定されている名前と列番号の対応を記録 val context = sql.select.withIndex().associate { it.value to it.index } return flux .flatMap { res -> res.map { row -> mapper(ContextualReadable(row, context)) } } .asFlow() }
コネクション取得部
TransactionSynchronizationManager...からFlux.usingWhen...までの間の処理には、トランザクションの有効・無効での処理切り替えが入っています。
コネクションリークを防ぐため、トランザクション非開始(= NoTransactionException発生)の場合のみclose処理を入れています。
jOOQとトランザクション関係の話は以下に関連記事があります。
結果型
ConnectionFactoryから返ってくる結果はReadableという型に格納されており、ここから結果を読み出すのは、jOOQを使う場合とかなり勝手が違います。
ContextualReadableは、この差を吸収するために定義した型です。
// 名前検索だとDSL.fieldで宣言されたプロパティがひっかけられなかったため、select上に指定されたindexで読み出している class ContextualReadable(val readable: Readable, val context: Map<Field<*>, Int>) { // 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外 fun findIndex(field: Field<*>): Int = context[field] ?: (context.entries.singleOrNull { it.key.name == field.name }?.value) ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません") // 最低限よく使いそうな型に対しての変換定義 @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST") fun <T> read(index: Int, clazz: Class<T>): T = when { clazz.isEnum -> readable.get(index, String::class.java)?.let { value -> (clazz as Class<Enum<*>>).enumConstants.find { it.name == value } ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした") } clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) } else -> readable.get(index, clazz) } as T /** * Recordからの読み出しを簡単化するための関数のReadable版 * * jOOQ同様の読み出しができるとは限らないため注意! * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと */ inline fun <reified T> read(field: Field<*>): T { val index = findIndex(field) val clazz = T::class.java return read(index, clazz) } /** * fieldと同じ型で読み出してよい場合 */ inline operator fun <reified T> get(field: Field<T>): T = read(field) }
jOOQのクエリと対応付けたReadableからの読み出し部
Readableにはnameで読み出すメソッドとindexで読み出すメソッドが定義されていますが、Readable上のnameとjOOQのfieldから取れるnameは食い違う場合が有りました。
そこで、クエリ上で指定されているfieldと指定順の対応をcontextで保持しておき、Readableからはindexで読み出す形としています。
前述のマッピングに向けた準備はここで利用されています。
// 完全一致 -> DSL.fieldで宣言されていた場合想定のファジー検索 -> 例外 fun findIndex(field: Field<*>): Int = context[field] ?: (context.entries.singleOrNull { it.key.name == field.name }?.value) ?: throw IllegalArgumentException("${field.name}はクエリ上で指定されていないか特定できません")
実読み出し部
ReadableはjOOQと関係無く定義されているため、jOOQのJSONB型への変換などは提供されていません。
そこで、よく使う系の型への変換を定義したのが以下です(もしかするとR2DBC側に何らかのコーデック登録ポイントは有るかも?)。
// 最低限よく使いそうな型に対しての変換定義 @Suppress("IMPLICIT_CAST_TO_ANY", "UNCHECKED_CAST") fun <T> read(index: Int, clazz: Class<T>): T = when { clazz.isEnum -> readable.get(index, String::class.java)?.let { value -> (clazz as Class<Enum<*>>).enumConstants.find { it.name == value } ?: throw IllegalArgumentException("${value}に対応するEnumが取得できませんでした") } clazz == JSONB::class.java -> readable.get(index, String::class.java)?.let { JSONB.valueOf(it) } else -> readable.get(index, clazz) } as T
呼び出し用関数類
呼び出し用関数は、jOOQのインターフェースに近づけたもの(get関数)と、独自定義関数に寄せたもの(read関数)との2通り用意しました。
/** * Recordからの読み出しを簡単化するための関数のReadable版 * * jOOQ同様の読み出しができるとは限らないため注意! * 読み出しに失敗する場合、Readableがサポートする型をTに指定する & マッピング関数側で適切な型変換を行うこと */ inline fun <reified T> read(field: Field<*>): T { val index = findIndex(field) val clazz = T::class.java return read(index, clazz) } /** * fieldと同じ型で読み出してよい場合 */ inline operator fun <reified T> get(field: Field<T>): T = read(field)
read関数に関しては以下をご覧下さい。
wrongwrong163377.hatenablog.com
その他注意点
jOOQでクエリを組み立てる際、単にDSLを使うとDB毎の方言に対応していないクエリが生成されてしまうことがあります。
val QUERY_BUILDER: DSLContext = DSL.using(SQLDialect.POSTGRES, ...のように、元々使っているのと同じ設定のDSLContextを作って使い回すことをお勧めします。