この記事は さくらインターネット Advent Calendar 2025 と Distributed Computing Advent Calendar 2025 の16日目の記事です。
12/16って言ったら12/16なんです。
今回は、この記事を読んで pg_lake に興味が湧いたので触ってみました。
では、始める前に、想定読者と前提は以下の通りです。
- Iceberg大好きっ子
- Docker Compose / Docker イメージ作成は一通り分かる人
- PostgreSQLを利用したことがある人
- pg_lakeは 1/22時点の
4bc1a58のものを前提とする - 実行環境は PC端末上のWSL2のUbuntu環境とする
ハンズオンでは、最終的に以下が出来る事を目指します。
- pg_lakeの実行環境をビルドする
- Docker Composeでpg_lakeを起動(S3互換ストレージには、さくらのオブジェクトストレージを利用)
- pg_lakeの外から、SparkとTrinoを使ってpg_lakeのIcebergテーブルを参照する
では、以下の目次の流れで進めていきます。
- pg_lakeの面白さ
- pg_lakeのざっくり全体像
- ハンズオン
- 使いどころ(ユースケース)を考えてみる
- pg_lake を“使わない方がいい”ケース
- 落とし穴と運用Tips
- まとめ
- 最後に
pg_lakeの面白さ
毎回、CSVもらうけど、ちょっとJOINして見たいみたいだけなのにダルいなー
とか、、、
PostgreSQLのテーブルのデータをデータレイクに同期したい(いわゆるCDC)を考えた場合、バッチで定期的にもっていく? もしくは、今どきに、Kafka、Schema Registry、Kafka Connect用意して(あるいはDebezium)流れをつくる?
そもそもデータレイクもデータレイクハウスとしてIceberg・Delta Lake・Hudiにしちゃう?
いやいやいやいや、うち、そんなデカいものもってないし、、、そんな予算通らないって orz
なんて思うことありませんか。
そもそもさー、ちょっと前に流行ったこの記事にもあるように、抱えてるデータってそこまでデカくないんだし、1インスタンスで良くない?
なんて思うことないですか?
そんな人たちに pg_lake がささりそうです。
とてもざっくりいうと、pg_lakeは、PostgreSQLの拡張です。
しかも、PostgreSQLの世界のまま、以下が実現出来るらしいです。
- Icebergテーブルを作り
- Icebergクエリエンジン(Spark・Trino ・Hive)から、PostgreSQLで作ったIcebergテーブルをR/Wしたい1
- CSV/JSON/Parquetを
COPY FROM 's3://...'で簡単にImport(Trinoでやりたいやつ) - ロードすらダルいので、CSV/JSON/Parquetを「外部テーブル」として即クエリ
- 逆にPostgreSQLから
COPY TO 's3://...'でExportも出来る - PostgreSQLに収まらない地理空間データを一部だけ読みたい場合にS3にデータを置いといて、PostGIS と組み合わせて、欲しいところだけ読みたい
中身どうなってるか気になりますよね?っという事でDeep Diveしていきます。
pg_lakeのざっくり全体像
pg_lake公式のREADMEを元に説明します。
アーキテクチャは下図の通り。

アーキテクチャ
大きく2層構造になっています。
PostgreSQL + pg_lake拡張(図の左側の青文字):
制御プレーン(DDL・トランザクション・カタログ)を担当。
CREATE TABLE ... USING iceberg、外部ファイルを参照する外部テーブル、COPY ... FROM/TO を提供。
pg_lake は Iceberg JDBC Catalog 互換の メタテーブル(/ビュー)群を提供し、例として iceberg_tables(ユーザ向け)や lake_iceberg.*(内部/互換用)から参照できる。
pgduck_server(DuckDB)(図の右側の緑文字):
データプレーンとして実データのスキャン・フィルタ・集計などデータ処理を担当。
別プロセスとして UNIX ドメインソケット(既定 dir=/tmp, port=5332)で PostgreSQL と通信し、列指向・並列でスキャン/集計を実行します。--cache_dir でリモートファイルのキャッシュも可能です。
ストレージはオブジェクトストレージ(S3/Blob/GCSなど)を利用。
Iceberg の metadata.json と Parquet、および CSV/JSON/Parquet などの“生データ”は オブジェクトストレージ に置く。既定のIcebergテーブルの保存先は pg_lake_iceberg.default_location_prefix で設定します。
実行フロー
PostgreSQL が解析 → プッシュダウン可能な処理を pgduck_server が実行 → 結果を PostgreSQL に返却。
またデフォルトではフォールバックする設計だが、設定(例:pg_lake_table.enable_strict_pushdown)により厳格化できる。
相互運用 pg_lake は PostgreSQL をバックエンドとする Iceberg JDBC Catalog 実装を提供する。そのため Spark/Trino などは Iceberg の JDBC Catalog として PostgreSQL に接続できる。
⚠️ただし pg_lake のカタログ実装は Hive Metastore や Apache Polaris のような独立したカタログサーバとは設計思想が異なる。pg_lake は PostgreSQL 拡張として Iceberg のメタデータを管理する構成です。 Hive Metastore や Apache Polaris のような「独立したカタログサーバ」を置き換える用途は想定しない方が安全です。
コンポーネント
主なコンポーネントは以下の通り。
- pg_lake_iceberg
PostgreSQL から Iceberg を管理(USING iceberg、iceberg_tables、パーティション、VACUUM によるSmall File/古いスナップショット整理、自動VACUUM(10分間隔) など)。 - pg_lake_table(FDW)
Parquet/CSV/JSON/外部 Iceberg/Delta/GDAL(地理系) を外部テーブルとして即クエリ。create foreign table ... options (path 's3://mybucket/logs/*.csv.gz', filename 'true')で外部テーブル作成し、SELECT path FROM lake_file.list('s3://mybucket/logs/*.csv.gz')で探索も出来る。 - pg_lake_copy
COPY ... FROM/TO 's3://...'で データレイク⇄PostgreSQL/Iceberg を往復。形式/圧縮の自動判別やオプション指定に対応。\copyも可。 - pgduck_server(外部プロセス)
DuckDB をロードする実行エンジン。PostgreSQL とはソケット接続、--memory_limit/--cache_dir/--init_file_path等でチューニング可能。 - S3(オブジェクトストレージ)
Iceberg/ファイルの実体置き場。PostgreSQL はカタログ状態を DB 内に保持、
pgduck_serverは必要に応じてローカルキャッシュ。 - 周辺ユーティリティ
変換(CSV/JSON→Parquet)やログ取り込みなどの手順・例がまとまっており、“ログをIcebergテーブルにインクリメンタルに変換” を少ない手数で運用可能。
デカいIcebergテーブルを読んだら即OOMなんじゃ?
って考えますよね。
pg_lakeでは、Icebergテーブルの仕組みを上手く活用しています。
つまり、(条件が揃えば)pushdown や列指向スキャンで効率化できます。ただし pruning / pushdown の対応範囲はテーブル種別や設定に依存し、pgduck_server のメモリ制限設計は別途必要です。
pruningや制限事項の詳細については、以下を参照ください。
https://github.com/Snowflake-Labs/pg_lake/blob/main/docs/iceberg-tables.md#partition-pruning
PostgreSQLテーブルの型とIcebergテーブルの型の対応状況は?
ドキュメントに記載なかったのでソースから探してみました3。
主なデータ型の対応関係を Postgres → Iceberg の向きで整理すると以下の通りです。
| PostgreSQL 型 | Iceberg Primitive Type | ざっくり意味 |
|---|---|---|
INTEGER |
int |
32-bit 符号付き整数 |
BIGINT |
long |
64-bit 符号付き整数 |
TEXT, VARCHAR |
string |
UTF-8 文字列 |
TIMESTAMP |
timestamp |
タイムゾーン無し timestamp(マイクロ秒精度, ローカルタイム) |
TIMESTAMPTZ |
timestamptz |
タイムゾーン付き timestamp(マイクロ秒精度, UTC 基準) |
BOOLEAN |
boolean |
真偽値 |
NUMERIC(p,s) |
decimal(p,s) |
固定小数点(最大 precision 38) |
REAL |
float |
32-bit 浮動小数点 |
DOUBLE PRECISION |
double |
64-bit 浮動小数点 |
DATE |
date |
日付(epoch からの日数) |
UUID |
uuid |
UUID |
BYTEA |
binary |
バイナリデータ |
外にも以下のような型も対応しています。
Icebergのmap型については、pg_lake独自PostgreSQL拡張機能 pg_map を使うことで利用できます4。
| PostgreSQL 型 | Iceberg Primitive Type | ざっくり意味 |
|---|---|---|
ARRAY(例: INTEGER[]) |
list<…> |
配列(要素型は元の配列要素に対応) |
composite type(例: (a int, b text)) |
struct<…> |
複合型(行型)。フィールドが struct の fields として展開される |
JSONB |
string |
JSON を文字列として格納(serialized JSON) |
pg_map の map(例: map_type.key_text_val_int) |
map<string,int> |
pg_map は内部的に composite+配列で表現するが、Iceberg では map としてスキーマ化される |
他の型については、pg_lake/pg_lake_table/tests/pytests/test_iceberg_types.py にテストケースがまとまっているので、参考にしてください。
また、現状ではクエリする際の制限事項 もあるので詳しくはそこを参照ください。
Icebergテーブルのメンテナンス
Icebergテーブルでは、古いスナップショットや小さなデータファイルを削除といったメンテナンスが必要です。 pg_lake では Iceberg テーブルのメンテナンスとして VACUUM コマンド を使います。また、自動 VACUUM 機能があり、10分間隔で自動的に実行されます。
自動VACUUMをテーブル個別に無効化する場合は以下のようにします。
-- テーブル作成時に無効化する場合 CREATE TABLE test_auto_vacuum(id INT) USING iceberg WITH (autovacuum_enabled='False'); -- 途中から無効化する場合 CREATE TABLE other_test_auto_vacuum(id INT) USING iceberg; ALTER FOREIGN TABLE other_test_auto_vacuum OPTIONS (ADD autovacuum_enabled 'false');
また、10分間隔の自動VACUUMの設定を変更したい場合は、pg_lake_iceberg.autovacuum_naptime(sec)を調整します。
ただ、現状は、テーブル個別に設定出来ないので、テーブル個別に設定したい場合は、自動VACUUMを無効化して、手動でVACUUMコマンドを実行する形になります。
他にもpg_lakeの設定値としては以下のものが有るようです。
postgres=# SELECT name, setting, unit, context, source FROM pg_settings WHERE name LIKE 'pg_lake%' ORDER BY 1; name | setting | unit | context | source ------------------------------------------------------+---------------------------+------+-----------+-------------------- pg_lake_engine.cache_manager_interval | 10000 | ms | superuser | default pg_lake_engine.enable_cache_manager | on | | superuser | default pg_lake_engine.max_cache_size | 20480 | MB | superuser | default pg_lake_iceberg.autovacuum | on | | sighup | default pg_lake_iceberg.autovacuum_naptime | 600 | s | sighup | default pg_lake_iceberg.default_avro_writer_block_size_kb | 64 | kB | superuser | default pg_lake_iceberg.default_location_prefix | s3://yassan-test/pg_lake/ | | superuser | configuration file pg_lake_iceberg.enable_object_store_catalog | on | | sighup | default pg_lake_iceberg.log_autovacuum_min_duration | 600000 | ms | sighup | default pg_lake_iceberg.object_store_catalog_location_prefix | | | sighup | default pg_lake_table.default_parquet_version | v1 | | superuser | default pg_lake_table.enable_full_query_pushdown | on | | user | default pg_lake_table.enable_strict_pushdown | on | | user | default pg_lake_table.target_file_size_mb | 512 | MB | user | default pg_lake_table.target_row_group_size_mb | 512 | MB | superuser | default (15 rows) postgres=# SHOW pg_lake_iceberg.manifest_min_count_to_merge; pg_lake_iceberg.manifest_min_count_to_merge --------------------------------------------- 100 (1 row) postgres=# SHOW pg_lake_iceberg.target_manifest_size_kb; pg_lake_iceberg.target_manifest_size_kb ----------------------------------------- 8MB (1 row) postgres=# SHOW pg_lake_iceberg.max_snapshot_age; pg_lake_iceberg.max_snapshot_age ---------------------------------- 1800 (1 row)
現状では更新頻度が高いIcebergテーブルの場合、自動VACUUMを無効化して、運用スケジュールに合わせて手動でVACUUMコマンドを実行するのが良さそうです。 また、VACUUMコマンドでパーティションの指定も出来ないのでVACUUMする範囲とテーブルの更新範囲が被るとVACUUMまたはクエリが失敗する可能性が出てくるので、自動VACUUMの頻度やクエリのリトライなどの考慮する必要があります。
では、次にハンズオンで実際に触ってみます。
ハンズオン
環境説明
オブジェクトストレージには、AWS S3互換の”さくらのオブジェクトストレージ”を利用します。
pg_lakeの実行環境はDocker Composeで構築します。 pg_lakeへ外から参照する環境として、Docker Composeの別プロジェクトを用意し、SparkとTrinoコンテナを起動します。
pg_lake環境の構築
pg_lakeのDockerイメージは公式から提供されていないので、自分でビルドします。 公式ドキュメント docker/LOCAL_DEV.md を参考にすればOKです。
TaskというTask Runnerを使う前提になっているので、まずはTaskをインストールしてください5。
pg_lakeのイメージのビルド
pg_lakeのリポジトリをcloneし、docker/ まで移動します。
Taskをインストールした後に、task build:local を実行してコンテナイメージをビルドします。
pg_lakeの外から接続出来るようにする
pg_lakeコンテナの外から接続出来るように改変します。
pg_lake/docker/scripts/entrypoint-postgres.sh を以下のように変更します。
やっていることは pg_hba.conf で外からの接続を許可し、postgresql.conf で listen_addresses='*' に変更しています。また、今回は横着して外から繋ぐ際のユーザとしてpostgresユーザを使うので、 postgres ユーザのパスワードを .envのPOSTGRES_PASSWORDを使うようにしています6。
▶ 🗒️
pg_lake/docker/scripts/entrypoint-postgres.shを表示
#!/bin/bash set -euo pipefail trap "echo 'Caught termination signal. Exiting...'; exit 0" SIGINT SIGTERM # Ensure PGBASEDIR and PG_MAJOR are set PGBASEDIR=${PGBASEDIR:-/home/postgres} PG_MAJOR=${PG_MAJOR:-18} # Create and fix permissions for temporary directory BEFORE starting PostgreSQL # Docker volumes are created with root ownership, but postgres user needs write access mkdir -p ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/base/pgsql_tmp sudo chown -R postgres:postgres ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/base/pgsql_tmp sudo chmod 700 ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/base/pgsql_tmp # Update pg_hba.conf # 外からつなぐようにオープンにする # ローカルはpeer(コンテナ内でpostgresユーザなら無パスワードでOK)、 # それ以外はmd5/scramでパスワード必須 # サンプル用(ザル設定) cat >"${PGBASEDIR}/pgsql-${PG_MAJOR}/data/pg_hba.conf" <<'HBA' local all postgres peer local all all md5 host all all 127.0.0.1/32 md5 host all all ::1/128 md5 host all all 0.0.0.0/0 md5 HBA # Update postgresql.conf # !!IMPORTANT!!: NOT RECOMMENDED FOR PRODUCTION # ALLOW ACCESS FROM ANY IP ADDRESS typically used for development to access the database from outside the container echo "listen_addresses = '*'" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf echo "port = 5432" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf echo "shared_preload_libraries = 'pg_extension_base'" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf echo "pg_lake_iceberg.default_location_prefix = 's3://yassan-test/pg_lake/'" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf echo "pg_lake_engine.host = 'host=${PGBASEDIR}/pgduck_socket_dir port=5332'" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf # Start PostgreSQL server using explicit path ${PGBASEDIR}/pgsql-${PG_MAJOR}/bin/pg_ctl -D ${PGBASEDIR}/pgsql-${PG_MAJOR}/data start -l ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/logfile # --- postgres のパスワードを設定。.envのPOSTGRES_PASSWORDを使う --- if [ -n "${POSTGRES_PASSWORD:-}" ]; then PW_ESCAPED=${POSTGRES_PASSWORD//\'/\'\'} psql -U postgres -v ON_ERROR_STOP=1 -c \ "ALTER ROLE postgres WITH PASSWORD '${PW_ESCAPED}';" fi # Run initialization script using explicit path ${PGBASEDIR}/pgsql-${PG_MAJOR}/bin/psql -U postgres -f /init-postgres.sql sleep infinity
.envのPOSTGRES_PASSWORDを使うようにしたので、pg_lake/docker/.env を以下のように変更します。
PG_LAKE_REF=main PG_MAJOR=18 POSTGRES_PASSWORD=StrongPassword_here
オブジェクトストレージをlocalstackからさくらのオブジェクトストレージに変更する
pg_lakeのDocker Composeファイルでは、オブジェクトストレージとしてlocalstackを使うようになっていますが、今回はさくらのオブジェクトストレージを使うように変更します7。
pg_lake/docker/scripts/init-pgduck-server.sql を以下のように変更します。
こうすることでpg_lake起動時に、pgduck_serverが利用するオブジェクトストレージ用のS3シークレットが作成されます。
-- create s3 secret for sakura object storage CREATE OR REPLACE SECRET sakura_s3 ( TYPE s3, PROVIDER config, SCOPE 's3://yassan-test', USE_SSL true, KEY_ID '<アクセスキーID>', SECRET '<シークレットアクセスキー>', URL_STYLE 'vhost', -- path-style じゃなくても良いよ ENDPOINT 's3.isk01.sakurastorage.jp', REGION 'jp-north-1' );
⚠️実運用ではアクセスキーをSQLに直書きせず、環境変数経由で渡す・最小権限のキーを使用するなどの対策を推奨します8。
Docker Composeファイルの変更
上記を踏まえて、Docker Composeファイルを以下のように変更します。
オブジェクトストレージにはさくらのクラウドのオブジェクトストレージを使うので、localstackのサービス定義をコメントアウトしています。
▶ 🗒️ 変更した
docker/docker-compose.ymlを表示
# https://knowledge.sakura.ad.jp/48152/#i-2 x-sakura-s3-workaround: &sakura_s3_workaround AWS_REQUEST_CHECKSUM_CALCULATION: "WHEN_REQUIRED" AWS_RESPONSE_CHECKSUM_VALIDATION: "WHEN_REQUIRED" # pg_lakeの定義 services: pg_lake-postgres: image: pg_lake:local pull_policy: if_not_present container_name: pg_lake ports: - "5432:5432" volumes: - ./scripts/entrypoint-postgres.sh:/entrypoint-postgres.sh - ./scripts/init-postgres.sql:/init-postgres.sql - pgduck-unix-socket-volume:/home/postgres/pgduck_socket_dir - pg-shared-tmp-dir-volume:/home/postgres/pgsql-${PG_MAJOR:-18}/data/base/pgsql_tmp entrypoint: ["/entrypoint-postgres.sh"] restart: unless-stopped healthcheck: test: ["CMD", "psql", "-c", "create table test(a int) using iceberg; drop table test;"] interval: 6s timeout: 2s start_period: 20s retries: 3 env_file: - .env cap_add: - SYS_PTRACE depends_on: - pgduck-server pgduck-server: image: pgduck-server:local pull_policy: if_not_present container_name: pgduck-server # NOTE: pgduck-server only listens on Unix sockets, not TCP # To access DuckDB from the host, connect via pg_lake-postgres container which shares the Unix socket # Example: psql -h localhost -p 5432 -U postgres -c "SELECT * FROM duckdb_fdw.some_table" # IMPORTANT: Both containers must share the same temp directory volume so pgduck_server can access temp files volumes: - ./scripts/entrypoint-pgduck-server.sh:/entrypoint-pgduck-server.sh - ./scripts/init-pgduck-server.sql:/init-pgduck-server.sql - pgduck-unix-socket-volume:/home/postgres/pgduck_socket_dir - pg-shared-tmp-dir-volume:/home/postgres/pgsql-${PG_MAJOR:-18}/data/base/pgsql_tmp entrypoint: ["/entrypoint-pgduck-server.sh"] restart: unless-stopped environment: <<: *sakura_s3_workaround healthcheck: test: ["CMD", "psql", "-p", "5332", "-h", "/home/postgres/pgduck_socket_dir", "-c", "SELECT 1;"] interval: 6s timeout: 2s start_period: 20s retries: 3 cap_add: - SYS_PTRACE # depends_on: # - localstack # localstack: # container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}" # image: localstack/localstack # pull_policy: if_not_present # ports: # - "127.0.0.1:4566:4566" # LocalStack Gateway # - "127.0.0.1:4510-4559:4510-4559" # external services port range # environment: # # LocalStack configuration: https://docs.localstack.cloud/references/configuration/ # - DEBUG=${DEBUG:-0} # volumes: # - "${LOCALSTACK_VOLUME_DIR:-./.volume}:/var/lib/localstack" # - "${PWD}/scripts/init-s3.sh:/etc/localstack/init/ready.d/init-s3.sh" # volumes: pgduck-unix-socket-volume: pg-shared-tmp-dir-volume: # Shared temp directory for both PostgreSQL and pgduck_server
後は、 task compose:up を実行すればpg_lake環境が起動します。
ケース0:Icebergテーブルを作成してみる
毎度おなじみ TLC Trip Record Data - TLC を使ってIcebergテーブルを作ってみます。
-- pg_lakeコンテナに入る $ docker compose exec pg_lake-postgres bash [postgres@52d96115560f ~]$ psql psql (18.0) Type "help" for help. postgres=# \d List of relations Schema | Name | Type | Owner --------+-------------------+-------+---------- public | geography_columns | view | postgres public | geometry_columns | view | postgres public | spatial_ref_sys | table | postgres (3 rows) -- Icebergテーブルを作成 -- スキーマ情報はParquetファイルから推測させ、初期データはParquetファイルを取り込む postgres=# CREATE TABLE trips_yellow () USING iceberg WITH (load_from = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet') ; CREATE TABLE -- Icebergテーブルが作成されたことを確認(外部テーブルとして作成されていた) postgres=# \d+ List of relations Schema | Name | Type | Owner | Persistence | Access method | Size | Description --------+-------------------+---------------+----------+-------------+---------------+---------+------------- public | geography_columns | view | postgres | permanent | | 0 bytes | public | geometry_columns | view | postgres | permanent | | 0 bytes | public | spatial_ref_sys | table | postgres | permanent | heap | 6936 kB | public | trips_yellow | foreign table | postgres | permanent | | 68 MB | (4 rows) -- Icebergテーブルの中身を確認 postgres=# SELECT * FROM trips_yellow LIMIT 2 ; vendorid | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | ratecodeid | store_and_fwd_flag | pulocationid | dolocationid | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | airport_fee | cbd_congestion_fee ----------+----------------------+-----------------------+-----------------+---------------+------------+--------------------+--------------+--------------+--------------+-------------+-------+---------+------------+--------------+-----------------------+--------------+----------------------+-------------+-------------------- 2 | 2025-01-17 18:08:04 | 2025-01-17 18:19:55 | | 0 | | | 161 | 186 | 0 | 13.22 | 0 | 0.5 | 0 | 0 | 1 | 17.97 | | | 0.75 2 | 2025-01-17 18:16:20 | 2025-01-17 18:33:22 | | 0.01 | | | 234 | 143 | 0 | 21.09 | 0 | 0.5 | 0 | 0 | 1 | 25.84 | | | 0.75 (2 rows) -- Icebergテーブルの行数を確認 postgres=# SELECT count(1) FROM trips_yellow ; count --------- 3475226 (1 row) -- Icebergテーブルにさらにデータを追加で読み込んでみる(S3バケット指定してもOK) postgres=# COPY trips_yellow FROM 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet'; COPY 3577543 -- 行数が増えていることを確認 postgres=# SELECT count(1) FROM trips_yellow ; count --------- 7052769 (1 row) -- Icebergテーブルの詳細を確認 postgres=# \d trips_yellow Foreign table "public.trips_yellow" Column | Type | Collation | Nullable | Default | FDW options -----------------------+-----------------------------+-----------+----------+---------+------------- vendorid | integer | | | | tpep_pickup_datetime | timestamp without time zone | | | | tpep_dropoff_datetime | timestamp without time zone | | | | passenger_count | bigint | | | | trip_distance | double precision | | | | ratecodeid | bigint | | | | store_and_fwd_flag | text | | | | pulocationid | integer | | | | dolocationid | integer | | | | payment_type | bigint | | | | fare_amount | double precision | | | | extra | double precision | | | | mta_tax | double precision | | | | tip_amount | double precision | | | | tolls_amount | double precision | | | | improvement_surcharge | double precision | | | | total_amount | double precision | | | | congestion_surcharge | double precision | | | | airport_fee | double precision | | | | cbd_congestion_fee | double precision | | | | Server: pg_lake_iceberg FDW options: (catalog 'postgres', location 's3://yassan-test/pg_lake/postgres/public/trips_yellow/18747') postgres=#
ケース1:PostgreSQLとして内部的にIcebergテーブルを参照
このケースでは、pg_lakeにJDBCでPostgreSQLとして接続し、pg_lake内のIcebergテーブルをSQLクエリしてみます。
クライアントのDocker Composeファイルは以下のようになります9。
# https://knowledge.sakura.ad.jp/48152/#i-2 x-sakura-s3: &sakura_s3 AWS_REQUEST_CHECKSUM_CALCULATION: "WHEN_REQUIRED" AWS_RESPONSE_CHECKSUM_VALIDATION: "WHEN_REQUIRED" BACKET_NAME: ${BACKET_NAME} AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} AWS_REGION: "jp-north-1" # クライアントの定義 services: spark: image: docker.io/library/spark:4.0.1-python3 container_name: spark ports: - "4040:4040" command: ["/bin/bash","-lc","sleep infinity"] # /opt/spark/ivy-cache が root:root で sparkユーザで書き込めないので user: root volumes: # spark.jars.packages のcache - spark-ivy:/opt/spark/ivy-cache # PySparkなどで使うスクリプト - ./scripts:/opt/spark/scripts environment: <<: *sakura_s3 PATH: "/opt/spark/bin:${PATH}" # JDBC 接続先や資格情報は ENV で差し込む PGHOST: "host.docker.internal" PGPORT: "5432" PGDATABASE: "postgres" PG_USER: "postgres" PG_PASSWORD: "StrongPassword_here" restart: unless-stopped trino: image: trinodb/trino:479 container_name: trino ports: - "8080:8080" volumes: # Trinoのカタログ設定を永続化 - ./trino/log.properties:/etc/trino/log.properties:ro - ./trino/pgl.properties:/etc/trino/catalog/pgl.properties:ro - ./trino/pg_iceberg.properties:/etc/trino/catalog/pg_iceberg.properties:ro environment: <<: *sakura_s3 CATALOG_MANAGEMENT: static # JDBC 接続先や資格情報は ENV で差し込む PGHOST: "host.docker.internal" PGPORT: "5432" PGDATABASE: "postgres" PG_USER: "postgres" PG_PASSWORD: "StrongPassword_here" restart: unless-stopped volumes: spark-ivy:
TrinoからPostgreSQLとしてpg_lakeに接続してクエリ
TrinoからPostgreSQLとしてpg_lakeに接続するためのカタログ設定ファイルは以下のようになります。
▶ 📜
pg_lake_client/trino-catalog/pgl.properties を表示
connector.name=postgresql connection-url=jdbc:postgresql://${ENV:PGHOST}:${ENV:PGPORT}/${ENV:PGDATABASE} connection-user=${ENV:PG_USER} connection-password=${ENV:PG_PASSWORD}
この状態で、ケース0で作成したIcebergテーブルをSQLクエリしてみます。
$ docker compose exec trino trino \ --output-format=ALIGNED \ --execute "select * from pgl.public.trips_yellow limit 2" vendorid | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | ratecodeid | store_and_fwd_flag | pulocationid | dolocationid | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | airport_fee | cbd_congestion_fee ----------+----------------------------+----------------------------+-----------------+---------------+------------+--------------------+--------------+--------------+--------------+-------------+-------+---------+------------+--------------+-----------------------+--------------+----------------------+-------------+-------------------- 2 | 2025-01-17 18:08:04.000000 | 2025-01-17 18:19:55.000000 | NULL | 0.0 | NULL | NULL | 161 | 186 | 0 | 13.22 | 0.0 | 0.5 | 0.0 | 0.0 | 1.0 | 17.97 | NULL | NULL | 0.75 2 | 2025-01-17 18:16:20.000000 | 2025-01-17 18:33:22.000000 | NULL | 0.01 | NULL | NULL | 234 | 143 | 0 | 21.09 | 0.0 | 0.5 | 0.0 | 0.0 | 1.0 | 25.84 | NULL | NULL | 0.75 (2 rows)
SparkからPostgreSQLとしてpg_lakeに接続してSQLクエリ
Trinoと同様にSparkでもPostgreSQLとしてpg_lakeに接続し、IcebergテーブルをSQLクエリしてみます。
$ docker compose exec spark spark-submit \ --conf spark.log.level=WARN \ --conf spark.jars.packages=org.postgresql:postgresql:42.7.8 \ --conf spark.jars.ivy=/opt/spark/ivy-cache \ /opt/spark/scripts/pglake_read.py WARNING: Using incubator modules: jdk.incubator.vector :: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: /opt/spark/ivy-cache/cache The jars for the packages stored in: /opt/spark/ivy-cache/jars org.postgresql#postgresql added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-dc8745c5-8cf1-4ecc-97f7-f4fffb30ca0c;1.0 confs: [default] found org.postgresql#postgresql;42.7.8 in central found org.checkerframework#checker-qual;3.49.5 in central :: resolution report :: resolve 111ms :: artifacts dl 3ms :: modules in use: org.checkerframework#checker-qual;3.49.5 from central in [default] org.postgresql#postgresql;42.7.8 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-dc8745c5-8cf1-4ecc-97f7-f4fffb30ca0c confs: [default] 0 artifacts copied, 2 already retrieved (0kB/5ms) 26/01/03 11:52:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/01/03 11:52:53 INFO SparkContext: Running Spark version 4.0.1 26/01/03 11:52:53 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64 26/01/03 11:52:53 INFO SparkContext: Java version 17.0.17 Setting Spark log level to "WARN". +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+ |vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|cbd_congestion_fee| +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+ |1 |2025-01-11 23:42:35 |2025-01-11 23:58:44 |1 |2.1 |1 |N |163 |48 |1 |13.5 |4.25 |0.5 |3.85 |0.0 |1.0 |23.1 |2.5 |0.0 |0.75 | |1 |2025-01-11 23:50:09 |2025-01-11 23:57:53 |1 |1.0 |1 |N |148 |114 |1 |7.9 |4.25 |0.5 |2.75 |0.0 |1.0 |16.4 |2.5 |0.0 |0.75 | |1 |2025-01-11 23:04:15 |2025-01-11 23:13:04 |1 |0.9 |1 |N |230 |162 |1 |8.6 |4.25 |0.5 |0.0 |0.0 |1.0 |14.35 |2.5 |0.0 |0.75 | |1 |2025-01-11 23:59:14 |2025-01-12 00:07:42 |1 |1.3 |1 |N |114 |107 |1 |9.3 |3.25 |0.5 |2.8 |0.0 |1.0 |16.85 |2.5 |0.0 |0.75 | |1 |2025-01-11 23:48:47 |2025-01-12 00:00:52 |2 |1.7 |1 |N |90 |79 |1 |12.1 |4.25 |0.5 |3.55 |0.0 |1.0 |21.4 |2.5 |0.0 |0.75 | +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+ only showing top 5 rows
また、PySparkのスクリプトは以下のようになります。
▶ 📜
pg_lake_client/scripts/pglake_read.py を表示
import os from pyspark.sql import SparkSession # 環境変数から接続先を取得(compose の environment で渡す) host = os.getenv("PGHOST", "127.0.0.1") port = os.getenv("PGPORT", "5432") db = os.getenv("PGDATABASE", "postgres") user = os.getenv("PG_USER", "postgres") pw = os.getenv("PG_PASSWORD", "") url = f"jdbc:postgresql://{host}:{port}/{db}" props = { "user": user, "password": pw, "driver": "org.postgresql.Driver", "fetchsize": "1000", } spark = SparkSession.builder.appName("pg_jdbc_read").getOrCreate() table_or_query = "(SELECT * FROM public.trips_yellow ORDER BY 1 LIMIT 10000) t" df = spark.read.jdbc(url=url, table=table_or_query, properties=props) df.show(5, truncate=False) spark.stop()
ケース2:Icebergカタログ(JDBC)としてSparkやTrinoからクエリ
このケースでは「PostgreSQLとして接続する」のではなく、pg_lake を Icebergのカタログ実装として扱う 検証します。
SparkからIcebergカタログとしてpg_lakeに接続してクエリ
$ export BUCKET_NAME=yassan-test $ docker compose exec spark spark-submit \ --conf spark.log.level=WARN \ --conf spark.jars.packages=org.postgresql:postgresql:42.7.8,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \ --conf spark.jars.ivy=/opt/spark/ivy-cache \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.defaultCatalog=postgres \ --conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.postgres.type=jdbc \ --conf spark.sql.catalog.postgres.uri=jdbc:postgresql://host.docker.internal:5432/postgres \ --conf spark.sql.catalog.postgres.jdbc.user=postgres \ --conf spark.sql.catalog.postgres.jdbc.password=StrongPassword_here \ --conf spark.sql.catalog.postgres.jdbc.verifyServerCertificate=false \ --conf spark.sql.catalog.postgres.jdbc.useSSL=false \ --conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \ --conf spark.sql.catalog.postgres.warehouse=s3://${BUCKET_NAME}/pg_lake \ --conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.postgres.s3.endpoint=https://s3.isk01.sakurastorage.jp \ /opt/spark/scripts/rest_read.py WARNING: Using incubator modules: jdk.incubator.vector :: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: /opt/spark/ivy-cache/cache The jars for the packages stored in: /opt/spark/ivy-cache/jars org.postgresql#postgresql added as a dependency org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency org.apache.iceberg#iceberg-aws-bundle added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-23e5b16c-74b8-4cae-8340-ecbbe0a7817f;1.0 confs: [default] found org.postgresql#postgresql;42.7.8 in central found org.checkerframework#checker-qual;3.49.5 in central found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 in central found org.apache.iceberg#iceberg-aws-bundle;1.10.1 in central :: resolution report :: resolve 175ms :: artifacts dl 9ms :: modules in use: org.apache.iceberg#iceberg-aws-bundle;1.10.1 from central in [default] org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 from central in [default] org.checkerframework#checker-qual;3.49.5 from central in [default] org.postgresql#postgresql;42.7.8 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 4 | 0 | 0 | 0 || 4 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-23e5b16c-74b8-4cae-8340-ecbbe0a7817f confs: [default] 0 artifacts copied, 4 already retrieved (0kB/8ms) 26/02/01 00:41:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/02/01 00:41:51 INFO SparkContext: Running Spark version 4.0.1 26/02/01 00:41:51 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64 26/02/01 00:41:51 INFO SparkContext: Java version 17.0.17 Setting Spark log level to "WARN". 26/02/01 00:41:56 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1 +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+ |vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|cbd_congestion_fee| +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+ |2 |2025-01-17 18:08:04 |2025-01-17 18:19:55 |NULL |0.0 |NULL |NULL |161 |186 |0 |13.22 |0.0 |0.5 |0.0 |0.0 |1.0 |17.97 |NULL |NULL |0.75 | |2 |2025-01-17 18:16:20 |2025-01-17 18:33:22 |NULL |0.01 |NULL |NULL |234 |143 |0 |21.09 |0.0 |0.5 |0.0 |0.0 |1.0 |25.84 |NULL |NULL |0.75 | |1 |2025-01-17 18:01:26 |2025-01-17 18:22:49 |NULL |6.7 |NULL |NULL |231 |141 |0 |25.7 |0.0 |0.5 |0.0 |0.0 |1.0 |30.45 |NULL |NULL |0.75 | |2 |2025-01-17 18:51:18 |2025-01-17 19:11:39 |NULL |0.0 |NULL |NULL |234 |239 |0 |-4.75 |0.0 |0.5 |0.0 |0.0 |1.0 |6.37 |NULL |NULL |0.75 | |2 |2025-01-17 18:14:51 |2025-01-17 18:25:51 |NULL |0.0 |NULL |NULL |193 |179 |0 |10.04 |0.0 |0.5 |0.0 |0.0 |1.0 |11.54 |NULL |NULL |0.0 | +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+ only showing top 5 rows
一応、読めていますが、、JDBCカタログのスキーマバージョンがV0のままなので、以下のWARNが出ています。
26/02/01 00:41:56 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
↑の実行コマンドにて、以下を足しています。
--conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \
V0 → V1 に変更して再実行すると、メタデータテーブルのスキーマが更新され、WARNも出なくなるはずなのですが、、
以下の様に自動マイグレーションを実行しようとしてエラーになります。
Setting Spark log level to "WARN". Traceback (most recent call last): File "/opt/spark/scripts/rest_read.py", line 10, in <module> df = spark.read.format("iceberg").load("postgres.public.trips_yellow") File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 311, in load File "/opt/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__ File "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 282, in deco File "/opt/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/protocol.py", line 327, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o48.load. : org.apache.iceberg.jdbc.UncheckedSQLException: Cannot check and eventually update SQL schema (略) Caused by: org.postgresql.util.PSQLException: ERROR: ALTER action ADD COLUMN cannot be performed on relation "iceberg_tables" Detail: This operation is not supported for views. (略)
PostgreSQL側でALTER TABLEができないのは、pg_lakeの pg_lake_iceberg 拡張が作成する iceberg_tables がビューとして扱われているのが原因のようです。その為、pg_lake以外のクエリエンジン(SparkやTrino)からJDBCカタログとして接続する場合は、スキーマバージョンをV0のまま使う必要があります10。
したがって、少なくともこの組合せでは “Iceberg JDBC Catalog としての運用” は難しいです。
▶ 🗒️ pg_lake-postgresのPostgreSQLを見た際のログを表示
$ docker compose exec pg_lake-postgres bash [postgres@ea3039d1b599 ~]$ psql psql (18.1) Type "help" for help. postgres=# copy trips_yellow from 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet'; COPY 3577543 postgres=# SELECT table_schema, table_name, table_type FROM information_schema.tables WHERE table_schema = 'lake_iceberg' AND table_name = 'tables'; table_schema | table_name | table_type --------------+------------+------------ lake_iceberg | tables | VIEW (1 row) postgres=# SELECT definition FROM pg_views WHERE schemaname = 'lake_iceberg' AND viewname = 'tables'; definition ----------------------------------------------------------------------------------------- SELECT (current_database())::text AS catalog_name, + ((pg_class.relnamespace)::regnamespace)::character varying(255) AS table_namespace,+ (pg_class.relname)::character varying(255) AS table_name, + tables_internal.metadata_location, + tables_internal.previous_metadata_location + FROM (lake_iceberg.tables_internal + JOIN pg_class ON ((pg_class.oid = (tables_internal.table_name)::oid))) + UNION ALL + SELECT tables_external.catalog_name, + tables_external.table_namespace, + tables_external.table_name, + tables_external.metadata_location, + tables_external.previous_metadata_location + FROM lake_iceberg.tables_external + WHERE ((tables_external.catalog_name)::text <> (current_database())::text); (1 row) postgres=# SELECT * FROM lake_iceberg.tables ORDER BY table_namespace, table_name; catalog_name | table_namespace | table_name | metadata_location | previous_metadata_location --------------+-----------------+--------------+-------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------- postgres | public | trips_yellow | s3://yassan-test/pg_lake/postgres/public/trips_yellow/18635/metadata/00002-505466b5-51e0-4282-9827-2d7cbaabb941.metadata.json | s3://yassan-test/pg_lake/postgres/public/trips_yellow/18635/metadata/00001-32f3d086-e001-464b-82d3-b4bbc313bdd5.metadata.json (1 row)
pg_lakeのIcebergテーブルのドキュメントの Accessing Iceberg tables with Spark を見てると、org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1 で確認していてかなり古い。
JDBCカタログが使うDBスキーマが v1.6.0 で V0/V1の概念が導入され、V1でView対応していたりするので、そこが影響してそうです。
TrinoからIcebergカタログとしてpg_lakeに接続してクエリ
TrinoからIcebergカタログとしてpg_lakeに接続するためのコネクターの設定ファイルは以下のようになります。
▶ 📜
pg_lake_client/trino-catalog/pg_iceberg.properties を表示
connector.name=iceberg
iceberg.catalog.type=jdbc
iceberg.jdbc-catalog.catalog-name=postgres
iceberg.jdbc-catalog.driver-class=org.postgresql.Driver
iceberg.jdbc-catalog.connection-url=jdbc:postgresql://${ENV:PGHOST}:${ENV:PGPORT}/${ENV:PGDATABASE}
iceberg.jdbc-catalog.connection-user=${ENV:PG_USER}
iceberg.jdbc-catalog.connection-password=${ENV:PG_PASSWORD}
iceberg.jdbc-catalog.schema-version=V0
iceberg.jdbc-catalog.default-warehouse-dir=s3://${ENV:BUCKET_NAME}/pg_lake
fs.native-s3.enabled=true
s3.region=${ENV:AWS_REGION}
s3.endpoint=https://s3.isk01.sakurastorage.jp
s3.aws-access-key=${ENV:AWS_ACCESS_KEY_ID}
s3.aws-secret-key=${ENV:AWS_SECRET_ACCESS_KEY}
この状態で、ケース0で作成したIcebergテーブルをSQLクエリしてみます。
$ docker compose exec trino trino \ --output-format=ALIGNED \ --execute "select * from pg_iceberg.public.trips_yellow limit 2" Query 20260201_010042_00000_etnxt failed: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
以下の様にJDBCカタログのスキーマバージョンがV0を指定していますがエラーになります。
iceberg.jdbc-catalog.schema-version=V0
Trinoのログを確認すると、以下のWARNが出ていました。
trino | 2026-02-01T01:00:42.833Z WARN Query-20260201_010042_00000_etnxt-162 org.apache.iceberg.jdbc.JdbcCatalog JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1 trino | 2026-02-01T01:00:42.862Z INFO dispatcher-query-4 io.trino.event.QueryMonitor TIMELINE: Query 20260201_010042_00000_etnxt :: FAILED (GENERIC_INTERNAL_ERROR) :: elapsed 405ms :: planning 0ms :: waiting 399ms :: scheduling 0ms :: running 0ms :: finishing 0ms :: begin 2026-02-01T01:00:42.442Z :: end 2026-02-01T01:00:42.847Z
Trinoの起動ログにも V0 の設定が反映されているようです
trino | 2026-02-01T01:00:15.210Z INFO main io.trino.bootstrap.catalog.pg_iceberg iceberg.jdbc-catalog.schema-version V1 V0 JDBC catalog schema version
まとめると、試したことから分かったことは以下の通り。
iceberg.jdbc-catalog.schema-version=V0を設定しても、実行時に V1 相当の振る舞い(View対応前提)を求めるエラーとなった- 起動ログ上では V0 が設定値として表示されている
このことから以下と考えられる。
- Trino側の Iceberg JDBC catalog 実装が実行時に V1 相当のスキーマを前提としている可能性
- あるいは pg_lake 側の JDBC 互換メタ実装が Iceberg 側の期待するDDL変更に対応していない可能性
結論:
現状11では、SparkやTrinoからIcebergカタログとしてpg_lakeを扱うのは止めたほうが良さそうです。
そもそも、あえてIcebergカタログとして接続しなくても、利用しやすいPostgreSQL接続で十分だった。
追記(2026-02-23):
Trinoでエラーになる件は、以下の通り、Trino側のバグでした。これがマージされれば回避できそうです。
ケース3:S3上のファイルを直接参照したり、テーブルをS3に吐く
ここでは前提として s3://yassan-test/data-yellow_trip/ 以下に yellow_tripdata_YYYY-MM.parquetのParquetファイルがあるとします。
バケット上にあるParquetファイルをテーブルとして参照する
# pg_lakeコンテナに入る $ docker compose exec pg_lake-postgres bash [postgres@52d96115560f ~]$ psql # 現状の確認 postgres=# \d List of relations Schema | Name | Type | Owner --------+-------------------+---------------+---------- public | geography_columns | view | postgres public | geometry_columns | view | postgres public | spatial_ref_sys | table | postgres public | trips_yellow | foreign table | postgres (4 rows) # `s3://yassan-test/data-yellow_trip/` を参照する外部テーブルを作成 postgres=# CREATE FOREIGN TABLE pq_tbl_yellow_tripdata() SERVER pg_lake OPTIONS (path 's3://yassan-test/data-yellow_trip/*.parquet') ; CREATE FOREIGN TABLE # 外部テーブルの存在を確認 postgres=# \d List of relations Schema | Name | Type | Owner --------+------------------------+---------------+---------- public | geography_columns | view | postgres public | geometry_columns | view | postgres public | pq_tbl_yellow_tripdata | foreign table | postgres public | spatial_ref_sys | table | postgres public | trips_yellow | foreign table | postgres (5 rows) # 作成した外部テーブルの定義を確認するとParquetのメタデータを元に型を推測している事が分かる postgres=# \d pq_tbl_yellow_tripdata Foreign table "public.pq_tbl_yellow_tripdata" Column | Type | Collation | Nullable | Default | FDW options -----------------------+-----------------------------+-----------+----------+---------+------------- vendorid | integer | | | | tpep_pickup_datetime | timestamp without time zone | | | | tpep_dropoff_datetime | timestamp without time zone | | | | passenger_count | bigint | | | | trip_distance | double precision | | | | ratecodeid | bigint | | | | store_and_fwd_flag | text | | | | pulocationid | integer | | | | dolocationid | integer | | | | payment_type | bigint | | | | fare_amount | double precision | | | | extra | double precision | | | | mta_tax | double precision | | | | tip_amount | double precision | | | | tolls_amount | double precision | | | | improvement_surcharge | double precision | | | | total_amount | double precision | | | | congestion_surcharge | double precision | | | | airport_fee | double precision | | | | Server: pg_lake FDW options: (path 's3://yassan-test/data-yellow_trip/*.parquet')
適当にクエリしてみます。
postgres=# select count(*) from pq_tbl_yellow_tripdata ; count ---------- 81405872 (1 row) postgres=# SELECT date_trunc('month', tpep_pickup_datetime) AS month, count(*) AS trips, sum(total_amount)::numeric(18,2) AS total_sales, avg(trip_distance) AS avg_distance FROM pq_tbl_yellow_tripdata WHERE tpep_pickup_datetime >= timestamp '2024-01-01' AND tpep_pickup_datetime < timestamp '2025-11-01' GROUP BY 1 ORDER BY 1; month | trips | total_sales | avg_distance ---------------------+---------+--------------+------------------- 2024-01-01 00:00:00 | 2964617 | 79456198.67 | 3.652168742201813 2024-02-01 00:00:00 | 3007533 | 80073797.21 | 3.860857653099632 2024-03-01 00:00:00 | 3582611 | 97162539.11 | 4.517420758212249 2024-04-01 00:00:00 | 3514295 | 96620076.26 | 5.283848157311692 2024-05-01 00:00:00 | 3723843 | 105662491.74 | 5.367007577924154 2024-06-01 00:00:00 | 3539170 | 98854290.75 | 5.222029280876485 2024-07-01 00:00:00 | 3076876 | 86417596.46 | 5.111791086153442 2024-08-01 00:00:00 | 2979192 | 84225308.56 | 4.944860284264796 2024-09-01 00:00:00 | 3633025 | 103683171.72 | 5.74147173223362 2024-10-01 00:00:00 | 3833780 | 108992892.43 | 5.123551205337354 2024-11-01 00:00:00 | 3646372 | 100829501.83 | 5.369536251374905 2024-12-01 00:00:00 | 3668371 | 103889576.09 | 5.087793451098265 2025-01-01 00:00:00 | 3475236 | 89005350.57 | 5.855122296729696 2025-02-01 00:00:00 | 3577543 | 89547719.11 | 6.025345118143267 2025-03-01 00:00:00 | 4145231 | 108878221.10 | 6.584121432556175 2025-04-01 00:00:00 | 3970568 | 105596876.87 | 6.994721848359639 2025-05-01 00:00:00 | 4591844 | 123430419.10 | 7.653431418836262 2025-06-01 00:00:00 | 4322949 | 118393545.05 | 7.478367660594344 2025-07-01 00:00:00 | 3898971 | 104599459.73 | 7.104746062485573 2025-08-01 00:00:00 | 3574080 | 94315391.11 | 7.08938401770422 2025-09-01 00:00:00 | 4251019 | 117809613.95 | 6.839372030563955 2025-10-01 00:00:00 | 4428687 | 119457540.64 | 6.696755654665819 (22 rows)
上記のようにバケットにあるParquetファイルをそのままにクエリ出来ていることが分かります。
ParquetファイルをIcebergテーブルにImportする
s3://yassan-test/data-yellow_trip/*.parquet をIcebergテーブルにImportしてみます。
-- コピー先のIcebergテーブルを作成 postgres=# CREATE TABLE iceberg_yellow_tripdata (LIKE pq_tbl_yellow_tripdata) USING iceberg; CREATE TABLE postgres=# \d iceberg_yellow_tripdata Foreign table "public.iceberg_yellow_tripdata" Column | Type | Collation | Nullable | Default | FDW options -----------------------+-----------------------------+-----------+----------+---------+------------- vendorid | integer | | | | tpep_pickup_datetime | timestamp without time zone | | | | tpep_dropoff_datetime | timestamp without time zone | | | | passenger_count | bigint | | | | trip_distance | double precision | | | | ratecodeid | bigint | | | | store_and_fwd_flag | text | | | | pulocationid | integer | | | | dolocationid | integer | | | | payment_type | bigint | | | | fare_amount | double precision | | | | extra | double precision | | | | mta_tax | double precision | | | | tip_amount | double precision | | | | tolls_amount | double precision | | | | improvement_surcharge | double precision | | | | total_amount | double precision | | | | congestion_surcharge | double precision | | | | airport_fee | double precision | | | | Server: pg_lake_iceberg FDW options: (catalog 'postgres', location 's3://yassan-test/pg_lake/postgres/public/iceberg_yellow_tripdata/28585') -- Parquetファイルを参照している外部テーブルを作成したIcebergテーブルにCOPYする postgres=# COPY iceberg_yellow_tripdata FROM 's3://yassan-test/data-yellow_trip/*.parquet'; COPY 81405872 postgres=# select count(*) from iceberg_yellow_tripdata ; count ---------- 81405872 (1 row) -- 同じクエリしてみる postgres=# SELECT date_trunc('month', tpep_pickup_datetime) AS month, count(*) AS trips, sum(total_amount)::numeric(18,2) AS total_sales, avg(trip_distance) AS avg_distance FROM iceberg_yellow_tripdata WHERE tpep_pickup_datetime >= timestamp '2024-01-01' AND tpep_pickup_datetime < timestamp '2025-11-01' GROUP BY 1 ORDER BY 1; month | trips | total_sales | avg_distance ---------------------+---------+--------------+------------------- 2024-01-01 00:00:00 | 2964617 | 79456198.67 | 3.652168742201802 2024-02-01 00:00:00 | 3007533 | 80073797.21 | 3.860857653099784 2024-03-01 00:00:00 | 3582611 | 97162539.11 | 4.517420758212385 2024-04-01 00:00:00 | 3514295 | 96620076.26 | 5.283848157311756 2024-05-01 00:00:00 | 3723843 | 105662491.74 | 5.367007577924266 2024-06-01 00:00:00 | 3539170 | 98854290.75 | 5.222029280876606 2024-07-01 00:00:00 | 3076876 | 86417596.46 | 5.111791086153619 2024-08-01 00:00:00 | 2979192 | 84225308.56 | 4.944860284264946 2024-09-01 00:00:00 | 3633025 | 103683171.72 | 5.741471732234034 2024-10-01 00:00:00 | 3833780 | 108992892.43 | 5.123551205337748 2024-11-01 00:00:00 | 3646372 | 100829501.83 | 5.369536251375254 2024-12-01 00:00:00 | 3668371 | 103889576.09 | 5.087793451098546 2025-01-01 00:00:00 | 3475236 | 89005350.57 | 5.855122296729983 2025-02-01 00:00:00 | 3577543 | 89547719.11 | 6.025345118143695 2025-03-01 00:00:00 | 4145231 | 108878221.10 | 6.584121432556849 2025-04-01 00:00:00 | 3970568 | 105596876.87 | 6.994721848359854 2025-05-01 00:00:00 | 4591844 | 123430419.10 | 7.65343141883677 2025-06-01 00:00:00 | 4322949 | 118393545.05 | 7.47836766059443 2025-07-01 00:00:00 | 3898971 | 104599459.73 | 7.104746062486313 2025-08-01 00:00:00 | 3574080 | 94315391.11 | 7.089384017704826 2025-09-01 00:00:00 | 4251019 | 117809613.95 | 6.839372030564524 2025-10-01 00:00:00 | 4428687 | 119457540.64 | 6.696755654666752 (22 rows)
Icebergテーブルのクエリ結果をS3に出力する
次に、集計したクエリをParquetファイルに出力してみます。
以下の通り、COPY (クエリ) TO 's3://bucket/to/prefix/hoge.parquet' で出力出来る事が分かる。
-- 集計クエリをバケットにParquetファイルで出力する COPY ( SELECT date_trunc('month', tpep_pickup_datetime) AS month, count(*) AS trips, sum(total_amount)::numeric(18,2) AS total_sales, avg(trip_distance) AS avg_distance FROM iceberg_yellow_tripdata WHERE tpep_pickup_datetime >= timestamp '2024-01-01' AND tpep_pickup_datetime < timestamp '2025-11-01' GROUP BY 1 ORDER BY 1 ) TO 's3://yassan-test/marts/yellow_monthly_202401_202510.parquet'; -- 出来てるか確認する為、出力先を外部参照してクエリしてみる CREATE FOREIGN TABLE pq_tbl_yellow_monthly() SERVER pg_lake OPTIONS (path 's3://yassan-test/marts/yellow_monthly_*.parquet') ; postgres=# \d pq_tbl_yellow_monthly Foreign table "public.pq_tbl_yellow_monthly" Column | Type | Collation | Nullable | Default | FDW options --------------+-----------------------------+-----------+----------+---------+------------- month | timestamp without time zone | | | | trips | bigint | | | | total_sales | numeric(18,2) | | | | avg_distance | double precision | | | | Server: pg_lake FDW options: (path 's3://yassan-test/marts/yellow_monthly_*.parquet') postgres=# select count(*) from pq_tbl_yellow_monthly ; count ------- 22 (1 row) postgres=# select * from pq_tbl_yellow_monthly ; month | trips | total_sales | avg_distance ---------------------+---------+--------------+------------------- 2024-01-01 00:00:00 | 2964617 | 79456198.67 | 3.65216874220182 2024-02-01 00:00:00 | 3007533 | 80073797.21 | 3.860857653099766 2024-03-01 00:00:00 | 3582611 | 97162539.11 | 4.517420758212356 2024-04-01 00:00:00 | 3514295 | 96620076.26 | 5.283848157311716 2024-05-01 00:00:00 | 3723843 | 105662491.74 | 5.367007577924155 2024-06-01 00:00:00 | 3539170 | 98854290.75 | 5.22202928087656 2024-07-01 00:00:00 | 3076876 | 86417596.46 | 5.111791086153628 2024-08-01 00:00:00 | 2979192 | 84225308.56 | 4.944860284264978 2024-09-01 00:00:00 | 3633025 | 103683171.72 | 5.741471732234106 2024-10-01 00:00:00 | 3833780 | 108992892.43 | 5.123551205337798 2024-11-01 00:00:00 | 3646372 | 100829501.83 | 5.369536251375229 2024-12-01 00:00:00 | 3668371 | 103889576.09 | 5.087793451098548 2025-01-01 00:00:00 | 3475236 | 89005350.57 | 5.855122296730121 2025-02-01 00:00:00 | 3577543 | 89547719.11 | 6.025345118143695 2025-03-01 00:00:00 | 4145231 | 108878221.10 | 6.584121432556826 2025-04-01 00:00:00 | 3970568 | 105596876.87 | 6.994721848359891 2025-05-01 00:00:00 | 4591844 | 123430419.10 | 7.653431418837025 2025-06-01 00:00:00 | 4322949 | 118393545.05 | 7.478367660594162 2025-07-01 00:00:00 | 3898971 | 104599459.73 | 7.10474606248644 2025-08-01 00:00:00 | 3574080 | 94315391.11 | 7.08938401770497 2025-09-01 00:00:00 | 4251019 | 117809613.95 | 6.839372030564667 2025-10-01 00:00:00 | 4428687 | 119457540.64 | 6.696755654666566 (22 rows)
せっかくなので、Icebergテーブル iceberg_yellow_tripdata のメタデータをちょっと覗いてみます。
$ export BUCKET_NAME=yassan-test $ docker compose exec spark spark-submit \ --conf spark.log.level=WARN \ --conf spark.jars.packages=org.postgresql:postgresql:42.7.8,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \ --conf spark.jars.ivy=/opt/spark/ivy-cache \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.defaultCatalog=postgres \ --conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.postgres.type=jdbc \ --conf spark.sql.catalog.postgres.uri=jdbc:postgresql://host.docker.internal:5432/postgres \ --conf spark.sql.catalog.postgres.jdbc.user=postgres \ --conf spark.sql.catalog.postgres.jdbc.password=StrongPassword_here \ --conf spark.sql.catalog.postgres.jdbc.verifyServerCertificate=false \ --conf spark.sql.catalog.postgres.jdbc.useSSL=false \ --conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \ --conf spark.sql.catalog.postgres.warehouse=s3://${BUCKET_NAME}/pg_lake \ --conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.postgres.s3.endpoint=https://s3.isk01.sakurastorage.jp \ /opt/spark/scripts/check_iceberg_tbl_metadata.py \ postgres.public.iceberg_yellow_tripdata
▶ 🗒️
postgres.public.iceberg_yellow_tripdata のメタデータの状態を確認した際の実行ログ
$ docker compose exec spark spark-submit \
--conf spark.log.level=WARN \
--conf spark.jars.packages=org.postgresql:postgresql:42.7.8,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \
--conf spark.jars.ivy=/opt/spark/ivy-cache \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.defaultCatalog=postgres \
--conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.postgres.type=jdbc \
--conf spark.sql.catalog.postgres.uri=jdbc:postgresql://host.docker.internal:5432/postgres \
--conf spark.sql.catalog.postgres.jdbc.user=postgres \
--conf spark.sql.catalog.postgres.jdbc.password=StrongPassword_here \
--conf spark.sql.catalog.postgres.jdbc.verifyServerCertificate=false \
--conf spark.sql.catalog.postgres.jdbc.useSSL=false \
--conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \
--conf spark.sql.catalog.postgres.warehouse=s3://${BUCKET_NAME}/pg_lake \
--conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.postgres.s3.endpoint=https://s3.isk01.sakurastorage.jp \
/opt/spark/scripts/check_iceberg_tbl_metadata.py \
postgres.public.iceberg_yellow_tripdata
WARNING: Using incubator modules: jdk.incubator.vector
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /opt/spark/ivy-cache/cache
The jars for the packages stored in: /opt/spark/ivy-cache/jars
org.postgresql#postgresql added as a dependency
org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3c034c47-bdd7-40a6-9635-977db416a74f;1.0
confs: [default]
found org.postgresql#postgresql;42.7.8 in central
found org.checkerframework#checker-qual;3.49.5 in central
found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 in central
found org.apache.iceberg#iceberg-aws-bundle;1.10.1 in central
:: resolution report :: resolve 145ms :: artifacts dl 5ms
:: modules in use:
org.apache.iceberg#iceberg-aws-bundle;1.10.1 from central in [default]
org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 from central in [default]
org.checkerframework#checker-qual;3.49.5 from central in [default]
org.postgresql#postgresql;42.7.8 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 4 | 0 | 0 | 0 || 4 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-3c034c47-bdd7-40a6-9635-977db416a74f
confs: [default]
0 artifacts copied, 4 already retrieved (0kB/9ms)
26/02/22 01:01:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/22 01:01:12 INFO SparkContext: Running Spark version 4.0.1
26/02/22 01:01:12 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64
26/02/22 01:01:12 INFO SparkContext: Java version 17.0.17
Setting Spark log level to "WARN".
26/02/22 01:01:17 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
現在のデフォルトカタログ: postgres
==============================================================================================================
[HISTORY]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.history ORDER BY made_current_at DESC
-- 結果 --
+--------------------+-------------------+---------+-------------------+
| made_current_at| snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2026-02-21 09:15:...|5403871763539957193| -1| true|
+--------------------+-------------------+---------+-------------------+
==============================================================================================================
[SNAPSHOTS]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.snapshots ORDER BY committed_at DESC
-- 結果 --
+--------------------+-------------------+---------+---------+------------------+-------+
| committed_at| snapshot_id|parent_id|operation| manifest_list|summary|
+--------------------+-------------------+---------+---------+------------------+-------+
|2026-02-21 09:15:...|5403871763539957193| -1| append|s3://yassan-tes...| {}|
+--------------------+-------------------+---------+---------+------------------+-------+
==============================================================================================================
[METADATA_LOG_ENTRIES]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.metadata_log_entries ORDER BY timestamp DESC
-- 結果 --
+--------------------+------------------+-------------------+----------------+----------------------+
| timestamp| file| latest_snapshot_id|latest_schema_id|latest_sequence_number|
+--------------------+------------------+-------------------+----------------+----------------------+
|2026-02-21 09:15:...|s3://yassan-tes...|5403871763539957193| 0| 1|
|2026-02-21 09:12:...|s3://yassan-tes...| NULL| NULL| NULL|
+--------------------+------------------+-------------------+----------------+----------------------+
==============================================================================================================
[MANIFESTS]
SELECT content, path, length, partition_spec_id, added_snapshot_id FROM postgres.public.iceberg_yellow_tripdata.manifests ORDER BY added_snapshot_id DESC
-- 結果 --
+-------+------------------+------+-----------------+-------------------+
|content| path|length|partition_spec_id| added_snapshot_id|
+-------+------------------+------+-----------------+-------------------+
| 0|s3://yassan-tes...| 8691| 0|5403871763539957193|
+-------+------------------+------+-----------------+-------------------+
==============================================================================================================
[FILES (データファイル)]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.files
-- 結果 --
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+
|content| file_path|file_format|spec_id|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts| lower_bounds| upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|first_row_id|referenced_data_file|content_offset|content_size_in_bytes| readable_metrics|
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+
| 0|s3://yassan-tes...| PARQUET| 0| 25322861| 518520071| NULL| NULL| NULL| NULL|{1 -> [01 00 00 0...|{1 -> [07 00 00 0...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|{{NULL, NULL, NUL...|
| 0|s3://yassan-tes...| PARQUET| 0| 25328936| 520064704| NULL| NULL| NULL| NULL|{1 -> [01 00 00 0...|{1 -> [07 00 00 0...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|{{NULL, NULL, NUL...|
| 0|s3://yassan-tes...| PARQUET| 0| 4206302| 85638572| NULL| NULL| NULL| NULL|{1 -> [01 00 00 0...|{1 -> [07 00 00 0...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|{{NULL, NULL, NUL...|
| 0|s3://yassan-tes...| PARQUET| 0| 26547773| 546084901| NULL| NULL| NULL| NULL|{1 -> [01 00 00 0...|{1 -> [06 00 00 0...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|{{NULL, NULL, NUL...|
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+
==============================================================================================================
[ENTRIES (マニフェストエントリ)]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.entries
-- 結果 --
+------+-------------------+---------------+--------------------+------------------+--------------------+
|status| snapshot_id|sequence_number|file_sequence_number| data_file| readable_metrics|
+------+-------------------+---------------+--------------------+------------------+--------------------+
| 1|5403871763539957193| 1| 1|{0, s3://yassan...|{{NULL, NULL, NUL...|
| 1|5403871763539957193| 1| 1|{0, s3://yassan...|{{NULL, NULL, NUL...|
| 1|5403871763539957193| 1| 1|{0, s3://yassan...|{{NULL, NULL, NUL...|
| 1|5403871763539957193| 1| 1|{0, s3://yassan...|{{NULL, NULL, NUL...|
+------+-------------------+---------------+--------------------+------------------+--------------------+
▶ 🗒️
check_iceberg_tbl_metadata.py の中身
#!/usr/bin/env python3 # /opt/spark/scripts/check_iceberg_tbl_metadata.py # # このスクリプトは、spark-submit --conf で設定された # Iceberg JDBC Catalog / S3 / 拡張設定を前提に動作する。 # ここではカタログ設定を一切行わない(起動時設定をそのまま使用)。 # # 使い方例: # spark-submit ... /opt/spark/scripts/check_iceberg_tbl_metadata.py \ # postgres.public.iceberg_yellow_tripdata # # 必須: # テーブル名は catalog.schema.table 形式で渡すこと。 # 例: postgres.public.iceberg_yellow_tripdata import argparse from typing import List, Tuple from pyspark.sql import SparkSession from pyspark.sql.utils import AnalysisException def make_queries(base: str) -> List[Tuple[str, str]]: """ Iceberg のメタデータテーブル一覧を生成する。 Iceberg Spark では以下の形式で参照できる: <table>.history <table>.metadata_log_entries <table>.snapshots <table>.entries <table>.files <table>.manifests """ return [ ("HISTORY", f"SELECT * FROM {base}.history ORDER BY made_current_at DESC"), ("SNAPSHOTS", f"SELECT * FROM {base}.snapshots ORDER BY committed_at DESC"), ("METADATA_LOG_ENTRIES", f"SELECT * FROM {base}.metadata_log_entries ORDER BY timestamp DESC"), ("MANIFESTS", f"SELECT content, path, length, partition_spec_id, added_snapshot_id FROM {base}.manifests ORDER BY added_snapshot_id DESC"), ("FILES (データファイル)", f"SELECT * FROM {base}.files"), ("ENTRIES (マニフェストエントリ)", f"SELECT * FROM {base}.entries"), ] def run_queries( spark: SparkSession, queries: List[Tuple[str, str]], limit: int, ) -> int: """ メタデータクエリを順番に実行する。 失敗したクエリ数を返す。 """ failed = 0 for title, sql in queries: print("\n" + "=" * 110) print(f"[{title}]") print(sql) try: df = spark.sql(sql) # print("\n-- スキーマ --") # df.printSchema() print("\n-- 結果 --") df.show(limit, truncate=True) except AnalysisException as e: failed += 1 print(f"\n!! AnalysisException: {e.desc}") except Exception as e: failed += 1 print(f"\n!! ERROR: {type(e).__name__}: {e}") return failed def main() -> None: parser = argparse.ArgumentParser( description="Iceberg テーブルのメタデータ(history/snapshots/files など)を確認するスクリプト" ) parser.add_argument( "table", help="対象テーブル名(catalog.schema.table 形式)" ) parser.add_argument( "--limit", type=int, default=100, help="各メタデータクエリの表示行数(デフォルト: 100)" ) args = parser.parse_args() spark = ( SparkSession.builder.appName(f"check_iceberg_tbl_metadata:{args.table}") .getOrCreate() ) # 現在のデフォルトカタログ確認(環境トラブル切り分け用) try: current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0] print(f"現在のデフォルトカタログ: {current_catalog}") except Exception: # Sparkバージョンによっては未対応のため無視 pass queries = make_queries(f"{args.table}") failed = run_queries(spark, queries, args.limit) spark.stop() if failed: raise SystemExit(f"{failed} 件のメタデータクエリが失敗しました。") if __name__ == "__main__": main()
ケース4:PostgreSQLってことはrollbackできるの?
pg_lakeの Iceberg tablesのBackupsの項目 を見ると、BEGINを使っていることが分かります。
pg_lakeはPostgreSQLなのでトランザクションが効くはず。という事は、Rollback出来るはずで、その場合、失敗して戻した際にIcebergテーブルとしてメタデータがどうなってるか見てみたい。
単純な1PCでお試ししてみます。
シナリオとしては、Icebergテーブルに対して、COPY ... FROM で、Icebergテーブルに複数COPYを実行し、途中でコケさせて、全部戻せるのかについて確認。
-- pg_lakeでIcebergテーブルを作る CREATE TABLE tbl_copy_txn () USING iceberg WITH (load_from = 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-01.parquet') ; -- トランザクション開始 BEGIN; -- 件数チェック SELECT count(*) FROM tbl_copy_txn ; -- データ投入(成功) COPY tbl_copy_txn FROM 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-02.parquet'; -- 件数チェック SELECT count(*) FROM tbl_copy_txn ; -- データ投入(失敗) COPY tbl_copy_txn FROM 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-99.parquet'; ROLLBACK; -- 件数チェック(テーブル作成時の件数になってるはず) SELECT count(*) FROM tbl_copy_txn ;
実際にやってみると、以下の様に戻せる事が分かる。
postgres=# CREATE TABLE tbl_copy_txn () USING iceberg WITH (load_from = 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-01.parquet') ; CREATE TABLE postgres=# \d tbl_copy_txn Foreign table "public.tbl_copy_txn" Column | Type | Collation | Nullable | Default | FDW options -----------------------+-----------------------------+-----------+----------+---------+------------- vendorid | integer | | | | tpep_pickup_datetime | timestamp without time zone | | | | tpep_dropoff_datetime | timestamp without time zone | | | | passenger_count | bigint | | | | trip_distance | double precision | | | | ratecodeid | bigint | | | | store_and_fwd_flag | text | | | | pulocationid | integer | | | | dolocationid | integer | | | | payment_type | bigint | | | | fare_amount | double precision | | | | extra | double precision | | | | mta_tax | double precision | | | | tip_amount | double precision | | | | tolls_amount | double precision | | | | improvement_surcharge | double precision | | | | total_amount | double precision | | | | congestion_surcharge | double precision | | | | airport_fee | double precision | | | | cbd_congestion_fee | double precision | | | | Server: pg_lake_iceberg FDW options: (catalog 'postgres', location 's3://yassan-test/pg_lake/postgres/public/tbl_copy_txn/58579') -- ここからトランザクション開始 postgres=# BEGIN; BEGIN postgres=*# SELECT count(*) FROM tbl_copy_txn ; count --------- 3475226 (1 row) -- データ投入 postgres=*# COPY tbl_copy_txn FROM 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-02.parquet'; COPY 3577543 postgres=*# SELECT count(*) FROM tbl_copy_txn ; count --------- 7052769 (1 row) -- あり得ないパスのデータを指定して失敗させる postgres=*# COPY tbl_copy_txn FROM 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-99.parquet'; ERROR: HTTP Error: Unable to connect to URL "https://s3.isk01.sakurastorage.jp/yassan-test/data-yellow_trip/yellow_tripdata_2025-99.parquet": 404 (Not Found). LINE 1: ..., cbd_congestion_fee::DOUBLE AS cbd_congestion_fee FROM read_parquet('s3://yassan-test/data-yellow_trip/yellow_tr... ^ -- ロールバック実施 postgres=!# ROLLBACK; ROLLBACK -- テーブル作成時のレコード数に戻っている 🙌 postgres=# SELECT count(*) FROM tbl_copy_txn ; count --------- 3475226 (1 row)
更にSpark使ってメタデータをチェックしてみると、失敗手前に成功していたCOPYも反映されていないこと(=テーブル作成時の状態)が分かります。
▶ 🗒️Sparkで確認した際のログを表示
$ docker compose exec spark spark-submit \ --conf spark.log.level=WARN \ --conf spark.jars.packages=org.postgresql:postgresql:42.7.8,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \ --conf spark.jars.ivy=/opt/spark/ivy-cache \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.defaultCatalog=postgres \ --conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.postgres.type=jdbc \ --conf spark.sql.catalog.postgres.uri=jdbc:postgresql://host.docker.internal:5432/postgres \ --conf spark.sql.catalog.postgres.jdbc.user=postgres \ --conf spark.sql.catalog.postgres.jdbc.password=StrongPassword_here \ --conf spark.sql.catalog.postgres.jdbc.verifyServerCertificate=false \ --conf spark.sql.catalog.postgres.jdbc.useSSL=false \ --conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \ --conf spark.sql.catalog.postgres.warehouse=s3://${BUCKET_NAME}/pg_lake \ --conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.postgres.s3.endpoint=https://s3.isk01.sakurastorage.jp \ /opt/spark/scripts/check_iceberg_tbl_metadata.py \ postgres.public.tbl_copy_txn WARNING: Using incubator modules: jdk.incubator.vector :: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: /opt/spark/ivy-cache/cache The jars for the packages stored in: /opt/spark/ivy-cache/jars org.postgresql#postgresql added as a dependency org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency org.apache.iceberg#iceberg-aws-bundle added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-13257d86-6fbd-4da8-9afe-141a2fad36ee;1.0 confs: [default] found org.postgresql#postgresql;42.7.8 in spark-list found org.checkerframework#checker-qual;3.49.5 in central found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 in central found org.apache.iceberg#iceberg-aws-bundle;1.10.1 in central :: resolution report :: resolve 258ms :: artifacts dl 15ms :: modules in use: org.apache.iceberg#iceberg-aws-bundle;1.10.1 from central in [default] org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 from central in [default] org.checkerframework#checker-qual;3.49.5 from central in [default] org.postgresql#postgresql;42.7.8 from spark-list in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 4 | 0 | 0 | 0 || 4 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-13257d86-6fbd-4da8-9afe-141a2fad36ee confs: [default] 0 artifacts copied, 4 already retrieved (0kB/10ms) 26/02/22 02:27:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/02/22 02:27:12 INFO SparkContext: Running Spark version 4.0.1 26/02/22 02:27:12 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64 26/02/22 02:27:12 INFO SparkContext: Java version 17.0.17 Setting Spark log level to "WARN". 26/02/22 02:27:18 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1 現在のデフォルトカタログ: postgres ============================================================================================================== [HISTORY] SELECT * FROM postgres.public.tbl_copy_txn.history ORDER BY made_current_at DESC -- 結果 -- +--------------------+-------------------+---------+-------------------+ | made_current_at| snapshot_id|parent_id|is_current_ancestor| +--------------------+-------------------+---------+-------------------+ |2026-02-22 02:15:...|2538362807379914821| -1| true| +--------------------+-------------------+---------+-------------------+ ============================================================================================================== [SNAPSHOTS] SELECT * FROM postgres.public.tbl_copy_txn.snapshots ORDER BY committed_at DESC -- 結果 -- +--------------------+-------------------+---------+---------+------------------+-------+ | committed_at| snapshot_id|parent_id|operation| manifest_list|summary| +--------------------+-------------------+---------+---------+------------------+-------+ |2026-02-22 02:15:...|2538362807379914821| -1| append|s3://yassan-tes...| {}| +--------------------+-------------------+---------+---------+------------------+-------+ ============================================================================================================== [METADATA_LOG_ENTRIES] SELECT * FROM postgres.public.tbl_copy_txn.metadata_log_entries ORDER BY timestamp DESC -- 結果 -- +--------------------+------------------+-------------------+----------------+----------------------+ | timestamp| file| latest_snapshot_id|latest_schema_id|latest_sequence_number| +--------------------+------------------+-------------------+----------------+----------------------+ |2026-02-22 02:15:...|s3://yassan-tes...|2538362807379914821| 0| 0| +--------------------+------------------+-------------------+----------------+----------------------+ ============================================================================================================== [MANIFESTS] SELECT content, path, length, partition_spec_id, added_snapshot_id FROM postgres.public.tbl_copy_txn.manifests ORDER BY added_snapshot_id DESC -- 結果 -- +-------+------------------+------+-----------------+-------------------+ |content| path|length|partition_spec_id| added_snapshot_id| +-------+------------------+------+-----------------+-------------------+ | 0|s3://yassan-tes...| 8277| 0|2538362807379914821| +-------+------------------+------+-----------------+-------------------+ ============================================================================================================== [FILES (データファイル)] SELECT * FROM postgres.public.tbl_copy_txn.files -- 結果 -- +-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+ |content| file_path|file_format|spec_id|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts| lower_bounds| upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|first_row_id|referenced_data_file|content_offset|content_size_in_bytes| readable_metrics| +-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+ | 0|s3://yassan-tes...| PARQUET| 0| 3475226| 71615025| NULL| NULL| NULL| NULL|{1 -> [01 00 00 0...|{1 -> [07 00 00 0...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|{{NULL, NULL, NUL...| +-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+ ============================================================================================================== [ENTRIES (マニフェストエントリ)] SELECT * FROM postgres.public.tbl_copy_txn.entries -- 結果 -- +------+-------------------+---------------+--------------------+------------------+--------------------+ |status| snapshot_id|sequence_number|file_sequence_number| data_file| readable_metrics| +------+-------------------+---------------+--------------------+------------------+--------------------+ | 1|2538362807379914821| 0| 0|{0, s3://yassan...|{{NULL, NULL, NUL...| +------+-------------------+---------------+--------------------+------------------+--------------------+
バケットの中を見ても成功したデータが残ってることも無さそう。
$ aws --profile sakura s3 ls s3://yassan-test/pg_lake/postgres/public/tbl_copy_txn/58579 --recursive 2026-02-22 11:15:55 71615025 pg_lake/postgres/public/tbl_copy_txn/58579/data/39b771a9-c674-41db-be41-082824669071/data_0.parquet 2026-02-22 11:15:55 2466 pg_lake/postgres/public/tbl_copy_txn/58579/metadata/00000-ef326f50-924b-4228-91ff-8e1d1d673817.metadata.json 2026-02-22 11:15:55 8277 pg_lake/postgres/public/tbl_copy_txn/58579/metadata/c6744213-3a56-4377-99ed-6db4295403df-m0.avro 2026-02-22 11:15:55 3796 pg_lake/postgres/public/tbl_copy_txn/58579/metadata/snap-2538362807379914821-1-c6744213-3a56-4377-99ed-6db4295403df.avro
おまけ:もしかして、2PC(分散トランザクション)っていける?
結論: ダメでした。
PREPARE TRANSACTION使って、2PC(分散トランザクション)いけるのか?って期待したい所ですが、以下のテストにあるように、少なくとも「Icebergメタデータ変更を含む書き込み」では pg_lake側が禁止していました。
ここでは触れませんが、PostgreSQLの設定に max_prepared_transactions = 10 を追加して PREPARE TRANSACTION 使えるようにを試したら、確かにテストの通り、cannot prepare a transaction that has Iceberg metadata changes となりました(て言うか、やってから気がついた)。
使いどころ(ユースケース)を考えてみる
個人的にはpg_lakeの「使い所」は大きくこのあたりだと考えます。
「PostgreSQL中心でやりたい」組織のレイクハウス基盤
状況
- 既にアプリもBIも「全部PostgreSQL前提」で回している
- だけど履歴データやログが肥大化して、
- 本番DBに置き続けると遅い・コスト高い
- とはいえ別のDWH・Sparkクラスターを立てるのは大ごと
pg_lake の使い所
- Iceberg テーブルを PostgreSQLの
CREATE TABLE ... USING icebergだけで作成・更新 できる - PostgreSQL トランザクション境界に統合される(少なくとも COPY などでロールバックが効くことを確認した)
- 一方で現状では 2PC は禁止される(Iceberg メタデータ変更を含むと prepare できない)
- データファイル自体は S3/Blob/GCS 上の Parquet に置くから、ストレージコストやスケールはData Lakehouseの様に考えられる
😆 ポイント
- OLTPは引き続きPostgreSQL、履歴・集計はIcebergテーブルへ
- でも アプリやBIは全部PostgreSQLに接続、というシンプルな構成を作れる
- テーブルがデカくなった場合にフルスキャンしたら詰むので慎重さは必要
Iceberg 中心のデータ基盤に「PostgreSQLの窓口」を付けたい
状況
- 既に S3 + Iceberg + Spark/Trino といったData Lakehouse基盤がある
- そこに「PostgreSQLからも同じIcebergを触りたい」ニーズがある
- PostgreSQL 互換で扱いたい
- JDBC/ODBC で簡単に接続したい
pg_lake の使い所
- PostgreSQL互換インターフェースとしての「書き込み窓口」とする
- 利用者はPostgreSQLとしてpg_lake経由にIcebergテーブルを意識せずに書き込みし、既存のData Lakehouseのクエリエンジンがpg_lakeをIcebergテーブルとして参照して取り込む
😆 ポイント
- Icebergカタログの軽量なPostgreSQLフロント
- アプリケーション または 運用チーム向けの窓口
- 細粒度の頻繁な INSERT は small files 問題を引き起こす可能性があるので注意は必要
- pg_lakeをApache Polaris(Iceberg REST Catalog)として連携して使うとより使い勝手が上がりそう12
S3上のファイルを“そのまま”分析したい
状況
- S3 に CSV/JSON/Parquet が山ほどある
- ざっと中身を見たい・軽く集計したいが、毎回ETLやSparkジョブを書くのは面倒
pg_lake の使い所
CREATE FOREIGN TABLE ... SERVER pg_lake OPTIONS (path 's3://...')で可能にする- 外部テーブルのスキーマのカラムの型は、参照するファイルから自動で推論される
- 実際のスキャン・集計は DuckDB のカラムナエンジンが担当するので、 大きめのファイルでもPostgreSQL単体よりだいぶ速い
😆 ポイント
- 「ちゃちゃっとつまみ食いしたい」を Postgres + SQL だけで済ませられる
- Spark/Trinoを使う前の “最初の見る・試す” レイヤーとして使うイメージ
"軽量な"データパイプライン・ETLの代替
状況
- バッチで全部DigdagやAirflowでワークフローを作るまでもないが、 ちょっとしたデータ移動・変換処理は定期的に走らせたい
- でも「S3 ↔ Iceberg ↔ PostgreSQL」を行き来する処理は増えていく
pg_lake の使い所
COPY FROM 's3://...'/COPY TO 's3://...'で S3⇔PostgreSQL/Icebergのロード・アンロードをSQLだけで記述- PL/pgSQL や
DOブロックと組み合わせて、 「月次ファイルをループして取り込む」「集計結果をParquetで吐き出す」といった処理を 全部PostgreSQL側のトランザクション下で完結できる
😆 ポイント
- 大掛かりなETL基盤までは要らないけど、手作りスクリプトも増やしたくない
- そこまで大きくない組織・チームの SQL中心のミドルクラスETL基盤として便利
地理空間データのレイクハウス化
ハンズオンでは取り上げませんでしたが、pg_lake_spatial 拡張を使うと PostGIS と組み合わせて
地理空間データも扱えます13。
状況
- PostGISで地理空間分析しているが、 元データ(Shapefile/GeoJSON/Parquetなど)はオブジェクトストレージに大量にある
- 全部PostgreSQLに入れちゃうとサーバスペック的に厳しい
pg_lake の使い所
pg_lake_spatialが GDAL(Geospatial Data Abstraction Library) を経由して各種GEOフォーマットを読める- 外部ファイルとして読みつつ、PostGIS と組み合わせて空間クエリをかけられる
- 必要な粒度でだけ Iceberg テーブルに入れとく、といった分割も可能
😆 ポイント
地理空間 ✕ データレイク として、生ファイル+PostGIS+Iceberg を1つのPostgreSQLエンドポイントで扱える
「開発向け運用分析」基盤
状況
- マイクロサービスやアプリが多数あり、それぞれPostgreSQLを使っている
- アプリ側で 軽い分析 または レポート(例:機械学習用特徴量抽出)なども求められる
pg_lake の使い所
- PostgreSQLの通常のheapテーブル・Icebergテーブル・S3上のファイルを1つのSQLでJOIN可能
- DuckDB側に重たい集計を寄せて、オンライン系PostgreSQLの負荷を抑えながら それでもアプリは「PostgreSQLに繋ぐだけ」で済む
😆 ポイント
- ユースケース1~3の組み合わせ
- 「アプリケーションDBのすぐ隣に小さなレイクハウスを生やす」ことで、 運用と分析のギャップを最小限にする
pg_lake を“使わない方がいい”ケース
ユースケースではないのですが、"NOT" ユースケースとして、
以下のようなケースでは、pg_lake を使わずに Spark/Flink/Trino を主役にした方が無難です。
- 既に完全に成熟した大規模レイクハウス(Spark/Flink/Trino中心)があり、 PostgreSQLは本当にOLTP専用に閉じておきたい場合
- 数百TB〜PBクラスを、多数のクエリエンジンが同時に叩くような超大規模分析クラスタ
- きついSLA付きの超重バッチを全部PostgreSQLベースで回そうとしている場合
1ノードではどうしてもスループットを出すには限界があるので、このあたりは、素直に Spark/Flink/Trino を主役にして、PostgreSQLはメタデータや小さなマート用に留める方が安全です。
落とし穴と運用Tips
これまでの話を踏まえて、落とし穴と運用Tipsを整理します
公式Docker Composeはdownしたら揮発する
- 落とし穴
- データ永続化前提ではない構成14
- PostgreSQLカタログが消えるとIceberg整合が崩れる
- Tips
- 本番はソースビルド or 永続ボリューム明示
- PostgreSQLのバックアップ必須(物理/論理)
Iceberg JDBCカタログ互換は限定的
- 落とし穴
- SparkはV0指定で読めるが、V1自動マイグレーションで失敗
- TrinoではV0指定してもV1を要求されエラー
- Tips
- pg_lakeをメインのIcebergカタログにしない
- 既存Data LakehouseがあるならPolaris/HMS中心に
2PC不可(1PCのみ)
- 落とし穴
PREPARE TRANSACTIONはIceberg変更を含むと禁止扱いとなる- 分散トランザクション基盤にはならない
- Tips
- 期待できるのは1PCの原子性のみ
Small FilesとVACUUM競合
- 落とし穴
- 細かいInsertを高頻度に実行すると自動VACUUM(10分間隔)と更新が衝突する可能性がある
- Tips
- そもそもそういった用途には向かない
- VACUUMはテーブルの利用に合わせて手動と自動に分けて運用するのが良さそう
pgduck_serverはメモリ無限ではない
- 落とし穴
- 大規模スキャンでOOMの可能性
- S3ファイルはローカルキャッシュされる
- Tips
--memory_limitと--cache_dirを明示して利用する- 実行計画とpushdownを確認する
まとめ
今回、Deep Diveしてみて個人的に pg_lake は、「Iceberg カタログそのもの」になるツールというよりも、
既存の PostgreSQL 世界とデータレイクをつなぐ“入口”として機能するツール
として捉えると設計判断がしやすいと考えます。 つまり、
PostgreSQL派にとっては、、、
「Snowflake風 or BigQuery風のことを、既存スキル+SQLだけでやるための延長線」
Iceberg派にとっては、、、
「既存のIcebergのData Lakehouseに、PostgreSQLという強力な“入口”を増やす手段」
この2つの視点でプロジェクトやチームのスケール、運用体制、既存スタックを見ながら 「どこまでをpg_lakeを活用するか」を決めていくのが良さそうです。
最後に
去年の年末は個人的に色々あって間に合わなかった分、気になってる所を結構潰しました。
心残りはコンテナで揮発しない構成を検討したが、やるには大規模改造になるのでここでは止めました。
pg_lakeは以下のドキュメントにあるように、Icebergテーブルを使ってて「これ欲しいんだよなぁ」ってなるIcebergテーブルへのImport/Exportがとても使いやすいです。
- pg_lake/docs/data-lake-import-export.md at main · Snowflake-Labs/pg_lake
- pg_lake/docs/file-formats-reference.md at main · Snowflake-Labs/pg_lake
使い所にも挙げましたがハマるところもあるんじゃないでしょうか。これからもウォッチしていきます。 また、この記事書いてる途中にApache Polarisへの対応も出てきたのですが、これはまた別の機会に。
以上、この記事は さくらインターネット Advent Calendar 2025 と Distributed Computing Advent Calendar 2025 の16日目の記事でした。
これで2025年が越せました。
-
今回は取り上げないですが、pg_lakeでのIcebergテーブルとしての書き込みはまた制限が大きいです。以下で書き込みがサポート始まってきました。
github.com↩ - Snowflake-Labs/pg_lakeのREADME より引用↩
- ソースだとこの辺を参考にしてます↩
- pg_map は 「key/value ペアを composite 型で表し、それを配列(domain)にしたもの」を map として扱う、という方針です。詳しくは pg_lake/docs/pg_map.md を参照。↩
-
何やってるのかは
pg_lake/docker/Taskfile.ymlを見ると分かります。↩ -
本来は専用のユーザを用意するべきだし、
pg_hba.confももっと厳密に設定すべきです。今回はサンプル用なのでザル設定にしています。例えば、実運用では0.0.0.0/0のような全許可は避けるべきで。また、Docker ブリッジの CIDR に限定する、127.0.0.1のみ公開する、専用ユーザを作成する等の制限を必ず行ってください。↩ -
さくらのオブジェクトストレージを利用するためには、pgduck_serverがアクセスできるように、S3シークレットを作成する必要があります。S3シークレットの作成方法については、以下のドキュメントを参照してください。
パーミッション設定によるアクセスキーの発行 - 基本操作 | さくらのクラウド マニュアル↩ -
例えば、
PROVIDER credential_chainとCHAIN 'env'を使い、KEY_IDとSECRETを使わないで、ENV経由で渡すなど。↩ -
本来なら必要な依存JARをSparkイメージに追加する方が良いのですが、今回は簡単のために
spark.jars.packagesで指定しています。
その場合、JARのキャッシュディレクトリをsparkコンテナの実行ユーザから書き込みできないといけないので、/opt/spark/ivy-cacheをボリュームマウントして、sparkコンテナをrootユーザで実行しています。↩ - Sparkを使っているドキュメントを見ると--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1 となっているのでかなり古いSpark&Icebergで確認しているようです。↩
-
少なくとも pg_lake [1/22時点の
4bc1a58] / Spark v4.0.1 & Iceberg v1.10.1 / Trino v479 の組み合わせでは。↩ - Snowflake-Labs/pg_lake#83 や Snowflake-Labs/pg_lake#68 を見ると分かるようにREST APIカタログへの対応が進んでいて、Apache Polarisと組み合わせることで夢が広がる。↩
- 詳しくは pg_lake/docs/spatial.md を参照。これでもう1本書きたいところ。↩
-
リポジトリにあるコンテナ環境 https://github.com/Snowflake-Labs/pg_lake/tree/main/docker は、
docker compose downすると揮発するので本番には向かない。
本番運用で使うなら、コンテナ使わずにソースビルドして使う方が無難↩