久しぶりにKafkaネタです。 Change Data Capture (CDC)でデータベースの変更をKafkaのイベントに変換するDebeziumを触り始めています。 PostgreSQLのレコードをKafkaに流すときに権限周りでハマったので、備忘録として残します。
Debeziumのおさらい
Debeziumはデータベースへのデータ登録、変更、削除をイベントに変換するソフトウェアです。 Kafka ConnectのSource Connectorとしてデプロイします。
Debeziumは今回扱うPostgreSQL以外にも、MySQL、Oracle、Db2といったRDBだけでなく、MongoDB、CassandraといったNoSQLもサポートしていることが特徴です。データベース固有の実装はConnector Pluginで抽象化されています。
PostgreSQL向けのConnector Pluginは、PostgreSQLの論理レプリケーションを通じてトランザクションログを読み込み、Kafkaのイベントに変換します。 そのため、PostgreSQL側にも論理レプリケーションを実行するための準備が必要です。
環境情報
| コンポーネント名 | バージョン |
|---|---|
| Kafka | 3.2 |
| PostgreSQL | 14.5 |
| Debezium | 1.9.5.Final |
課題
DebeziumのPostgreSQL Connectorは、PostgreSQLの論理レプリケーションを使用します。 PostgreSQLの論理レプリケーションはパブリケーション(PUBLICATION)と呼ばれる、レプリケーション対象のテーブルの集合および操作を定義したオブジェクトが必要になります。 Debeziumのデフォルトの設定では、Connectorがパブリケーションを自動生成します。 しかし本番環境ではConnectorのロールに渡す権限を最小限にするため、パブリケーションを事前に作成してConnectorから利用することが推奨されています。
ハマりポイントは事前にパブリケーションを作成する時にあります。
論理レプリケーションのプラグインに pgoutput を使用する場合、Debeziumのドキュメントに従って次のSQLでロールとパブリケーションを定義します。
ロールに REPLICATION 権限をつけることで論理レプリケーションを許可します。
また、作成するパブリケーション dbz_publication は、Debeziumがデフォルトで使用するパブリケーション名です。
-- Debeziumで使用するロール(=ユーザー) CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD '***'; -- 現在および今後作成される全てのテーブルをレプリケーション対象にする CREATE PUBLICATION dbz_publication FOR ALL TABLES;
しかし、この状態でDebeziumのConnectorを起動すると以下のログが出力され、permission denied for table contentsで起動に失敗します。
Snapshot step 1 - Preparing ...
Snapshot step 2 - Determining captured tables ...
Snapshot step 3 - Locking captured tables [テーブル名...]
Snapshot step 4 - Determining snapshot offset ...
Snapshot step 5 - Reading structure of captured tables
Snapshot step 6 - Persisting schema history
Snapshot step 7 - Snapshotting data ...
Snapshotting contents of N tables while still in transaction
Exporting data from table '<スキーマ名>.<テーブル名>' (1 of N tables)
For table '<スキーマ名>.<テーブル名>' using select statement: 'SELECT column1, column2, ... FROM <スキーマ名>.<テーブル名>'
Snapshot - Final stage
Producer failure [io.debezium.pipeline.ErrorHandler] io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: Snapshotting of table <スキーマ名>.<テーブル名> failed
Caused by: org.postgresql.util.PSQLException: ERROR: permission denied for table <テーブル名>
(...後略...)
Connectorの状況をKafka ConnectのREST APIで確認しても FAILED ステータスになっています。
$ curl -s http://localhost:8083/connectors/postgresql-dbz-source-connector/status | jq .
{
"name": "postgresql-dbz-source-connector",
"connector": {...},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.20.33.45:8083",
"trace": "..."
}
],
"type": "source"
}
対策
Connectorの起動失敗メッセージはDebeziumがPostgreSQLに接続に使用するロールの権限エラーです。 DebeziumはConnector起動直後に、現在のテーブルの状態のスナップショットを取得します。 このスナップショットを取得する際にSELECT文を発行します。
select statement: 'SELECT column1, column2, ... FROM <スキーマ名>.<テーブル名>
上記のロールはスナップショットを取得するテーブルのSELECT権限を事前につける必要があります。
スキーマ内の全てのテーブルを対象とする場合、次のSQLで debezium ロールにSELECT権限をつけます。
GRANT SELECT ON ALL TABLES IN SCHEMA <スキーマ名> TO debezium;
再度Connectorを起動すると、無事イベントの連携が始まりました。
$ curl -s http://localhost:8083/connectors/postgresql-dbz-source-connector/status | jq .
{
"name": "postgresql-dbz-source-connector",
"connector": {...},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.20.33.45:8083",
"trace": "..."
}
],
"type": "source"
}
以上。