以下の内容はhttps://yassan.hatenablog.jp/entry/advent-calendar-2025-1216より取得しました。


PostgreSQLをIcebergレイクハウスの“入口”にする ~ pg_lakeを検証して分かったことメモ~

この記事は さくらインターネット Advent Calendar 2025Distributed Computing Advent Calendar 2025 の16日目の記事です。

12/16って言ったら12/16なんです。

qiita.com

qiita.com

今回は、この記事を読んで pg_lake に興味が湧いたので触ってみました。

www.snowflake.com

では、始める前に、想定読者と前提は以下の通りです。

  • Iceberg大好きっ子
  • Docker Compose / Docker イメージ作成は一通り分かる人
  • PostgreSQLを利用したことがある人
  • pg_lakeは 1/22時点の4bc1a58のものを前提とする
  • 実行環境は PC端末上のWSL2のUbuntu環境とする

ハンズオンでは、最終的に以下が出来る事を目指します。

  1. pg_lakeの実行環境をビルドする
  2. Docker Composeでpg_lakeを起動(S3互換ストレージには、さくらのオブジェクトストレージを利用)
  3. pg_lakeの外から、SparkとTrinoを使ってpg_lakeのIcebergテーブルを参照する

では、以下の目次の流れで進めていきます。

pg_lakeの面白さ

毎回、CSVもらうけど、ちょっとJOINして見たいみたいだけなのにダルいなー

とか、、、

PostgreSQLのテーブルのデータをデータレイクに同期したい(いわゆるCDC)を考えた場合、バッチで定期的にもっていく? もしくは、今どきに、Kafka、Schema Registry、Kafka Connect用意して(あるいはDebezium)流れをつくる?

そもそもデータレイクもデータレイクハウスとしてIceberg・Delta Lake・Hudiにしちゃう?

いやいやいやいや、うち、そんなデカいものもってないし、、、そんな予算通らないって orz

なんて思うことありませんか。

そもそもさー、ちょっと前に流行ったこの記事にもあるように、抱えてるデータってそこまでデカくないんだし、1インスタンスで良くない?

motherduck.com

なんて思うことないですか?

そんな人たちに pg_lake がささりそうです。

とてもざっくりいうと、pg_lakeは、PostgreSQLの拡張です。

しかも、PostgreSQLの世界のまま、以下が実現出来るらしいです。

  • Icebergテーブルを作り
  • Icebergクエリエンジン(Spark・Trino ・Hive)から、PostgreSQLで作ったIcebergテーブルをR/Wしたい1
  • CSV/JSON/ParquetをCOPY FROM 's3://...' で簡単にImport(Trinoでやりたいやつ)
  • ロードすらダルいので、CSV/JSON/Parquetを「外部テーブル」として即クエリ
  • 逆にPostgreSQLから COPY TO 's3://...' でExportも出来る
  • PostgreSQLに収まらない地理空間データを一部だけ読みたい場合にS3にデータを置いといて、PostGIS と組み合わせて、欲しいところだけ読みたい

中身どうなってるか気になりますよね?っという事でDeep Diveしていきます。

pg_lakeのざっくり全体像

pg_lake公式のREADMEを元に説明します。

アーキテクチャは下図の通り。

Snowflake-Labs/pg_lakeにあるアーキテクチャ図
pg_lakeのアーキテクチャ2

アーキテクチャ

大きく2層構造になっています。

PostgreSQL + pg_lake拡張(図の左側の青文字):

制御プレーン(DDL・トランザクション・カタログ)を担当。

CREATE TABLE ... USING iceberg、外部ファイルを参照する外部テーブル、COPY ... FROM/TO を提供。

pg_lake は Iceberg JDBC Catalog 互換の メタテーブル(/ビュー)群を提供し、例として iceberg_tables(ユーザ向け)や lake_iceberg.*(内部/互換用)から参照できる。

pgduck_server(DuckDB)(図の右側の緑文字):

データプレーンとして実データのスキャン・フィルタ・集計などデータ処理を担当。

別プロセスとして UNIX ドメインソケット(既定 dir=/tmp, port=5332)で PostgreSQL と通信し、列指向・並列でスキャン/集計を実行します。--cache_dir でリモートファイルのキャッシュも可能です。

ストレージはオブジェクトストレージ(S3/Blob/GCSなど)を利用。

Iceberg の metadata.json と Parquet、および CSV/JSON/Parquet などの“生データ”は オブジェクトストレージ に置く。既定のIcebergテーブルの保存先は pg_lake_iceberg.default_location_prefix で設定します。

実行フロー
PostgreSQL が解析 → プッシュダウン可能な処理を pgduck_server が実行 → 結果を PostgreSQL に返却。
またデフォルトではフォールバックする設計だが、設定(例:pg_lake_table.enable_strict_pushdown)により厳格化できる。

相互運用 pg_lake は PostgreSQL をバックエンドとする Iceberg JDBC Catalog 実装を提供する。そのため Spark/Trino などは Iceberg の JDBC Catalog として PostgreSQL に接続できる。

⚠️ただし pg_lake のカタログ実装は Hive Metastore や Apache Polaris のような独立したカタログサーバとは設計思想が異なる。pg_lake は PostgreSQL 拡張として Iceberg のメタデータを管理する構成です。 Hive Metastore や Apache Polaris のような「独立したカタログサーバ」を置き換える用途は想定しない方が安全です。

コンポーネント

主なコンポーネントは以下の通り。

  • pg_lake_iceberg
    PostgreSQL から Iceberg を管理(USING icebergiceberg_tables、パーティション、VACUUM によるSmall File/古いスナップショット整理、自動VACUUM(10分間隔) など)。
  • pg_lake_table(FDW)
    Parquet/CSV/JSON/外部 Iceberg/Delta/GDAL(地理系) を外部テーブルとして即クエリ。create foreign table ... options (path 's3://mybucket/logs/*.csv.gz', filename 'true') で外部テーブル作成し、SELECT path FROM lake_file.list('s3://mybucket/logs/*.csv.gz') で探索も出来る。
  • pg_lake_copy
    COPY ... FROM/TO 's3://...'データレイク⇄PostgreSQL/Iceberg を往復。形式/圧縮の自動判別やオプション指定に対応。\copy も可。
  • pgduck_server(外部プロセス)
    DuckDB をロードする実行エンジン。PostgreSQL とはソケット接続、--memory_limit/--cache_dir/--init_file_path 等でチューニング可能。
  • S3(オブジェクトストレージ) Iceberg/ファイルの実体置き場。PostgreSQL はカタログ状態を DB 内に保持、pgduck_server は必要に応じてローカルキャッシュ。
  • 周辺ユーティリティ
    変換(CSV/JSON→Parquet)やログ取り込みなどの手順・例がまとまっており、“ログをIcebergテーブルにインクリメンタルに変換” を少ない手数で運用可能。

デカいIcebergテーブルを読んだら即OOMなんじゃ?

って考えますよね。

pg_lakeでは、Icebergテーブルの仕組みを上手く活用しています。
つまり、(条件が揃えば)pushdown や列指向スキャンで効率化できます。ただし pruning / pushdown の対応範囲はテーブル種別や設定に依存し、pgduck_server のメモリ制限設計は別途必要です。

pruningや制限事項の詳細については、以下を参照ください。

https://github.com/Snowflake-Labs/pg_lake/blob/main/docs/iceberg-tables.md#partition-pruning

PostgreSQLテーブルの型とIcebergテーブルの型の対応状況は?

ドキュメントに記載なかったのでソースから探してみました3

主なデータ型の対応関係を Postgres → Iceberg の向きで整理すると以下の通りです。

PostgreSQL 型 Iceberg Primitive Type ざっくり意味
INTEGER int 32-bit 符号付き整数
BIGINT long 64-bit 符号付き整数
TEXT, VARCHAR string UTF-8 文字列
TIMESTAMP timestamp タイムゾーン無し timestamp(マイクロ秒精度, ローカルタイム)
TIMESTAMPTZ timestamptz タイムゾーン付き timestamp(マイクロ秒精度, UTC 基準)
BOOLEAN boolean 真偽値
NUMERIC(p,s) decimal(p,s) 固定小数点(最大 precision 38)
REAL float 32-bit 浮動小数点
DOUBLE PRECISION double 64-bit 浮動小数点
DATE date 日付(epoch からの日数)
UUID uuid UUID
BYTEA binary バイナリデータ

外にも以下のような型も対応しています。
Icebergのmap型については、pg_lake独自PostgreSQL拡張機能 pg_map を使うことで利用できます4

PostgreSQL 型 Iceberg Primitive Type ざっくり意味
ARRAY(例: INTEGER[] list<…> 配列(要素型は元の配列要素に対応)
composite type(例: (a int, b text) struct<…> 複合型(行型)。フィールドが struct の fields として展開される
JSONB string JSON を文字列として格納(serialized JSON)
pg_map の map(例: map_type.key_text_val_int map<string,int> pg_map は内部的に composite+配列で表現するが、Iceberg では map としてスキーマ化される

他の型については、pg_lake/pg_lake_table/tests/pytests/test_iceberg_types.py にテストケースがまとまっているので、参考にしてください。 また、現状ではクエリする際の制限事項 もあるので詳しくはそこを参照ください。

Icebergテーブルのメンテナンス

Icebergテーブルでは、古いスナップショットや小さなデータファイルを削除といったメンテナンスが必要です。 pg_lake では Iceberg テーブルのメンテナンスとして VACUUM コマンド を使います。また、自動 VACUUM 機能があり、10分間隔で自動的に実行されます。

自動VACUUMをテーブル個別に無効化する場合は以下のようにします。

-- テーブル作成時に無効化する場合
CREATE TABLE test_auto_vacuum(id INT) USING iceberg WITH (autovacuum_enabled='False');

-- 途中から無効化する場合
CREATE TABLE other_test_auto_vacuum(id INT) USING iceberg;
ALTER FOREIGN TABLE other_test_auto_vacuum OPTIONS (ADD autovacuum_enabled 'false');

また、10分間隔の自動VACUUMの設定を変更したい場合は、pg_lake_iceberg.autovacuum_naptime(sec)を調整します。 ただ、現状は、テーブル個別に設定出来ないので、テーブル個別に設定したい場合は、自動VACUUMを無効化して、手動でVACUUMコマンドを実行する形になります。

他にもpg_lakeの設定値としては以下のものが有るようです。

postgres=# SELECT name, setting, unit, context, source
FROM pg_settings
WHERE name LIKE 'pg_lake%'
ORDER BY 1;
                         name                         |           setting         | unit |  context  |       source       
------------------------------------------------------+---------------------------+------+-----------+--------------------
 pg_lake_engine.cache_manager_interval                | 10000                     | ms   | superuser | default
 pg_lake_engine.enable_cache_manager                  | on                        |      | superuser | default
 pg_lake_engine.max_cache_size                        | 20480                     | MB   | superuser | default
 pg_lake_iceberg.autovacuum                           | on                        |      | sighup    | default
 pg_lake_iceberg.autovacuum_naptime                   | 600                       | s    | sighup    | default
 pg_lake_iceberg.default_avro_writer_block_size_kb    | 64                        | kB   | superuser | default
 pg_lake_iceberg.default_location_prefix              | s3://yassan-test/pg_lake/ |      | superuser | configuration file
 pg_lake_iceberg.enable_object_store_catalog          | on                        |      | sighup    | default
 pg_lake_iceberg.log_autovacuum_min_duration          | 600000                    | ms   | sighup    | default
 pg_lake_iceberg.object_store_catalog_location_prefix |                           |      | sighup    | default
 pg_lake_table.default_parquet_version                | v1                        |      | superuser | default
 pg_lake_table.enable_full_query_pushdown             | on                        |      | user      | default
 pg_lake_table.enable_strict_pushdown                 | on                        |      | user      | default
 pg_lake_table.target_file_size_mb                    | 512                       | MB   | user      | default
 pg_lake_table.target_row_group_size_mb               | 512                       | MB   | superuser | default
(15 rows)

postgres=# SHOW pg_lake_iceberg.manifest_min_count_to_merge;
 pg_lake_iceberg.manifest_min_count_to_merge 
---------------------------------------------
 100
(1 row)

postgres=# SHOW pg_lake_iceberg.target_manifest_size_kb;
 pg_lake_iceberg.target_manifest_size_kb 
-----------------------------------------
 8MB
(1 row)

postgres=# SHOW pg_lake_iceberg.max_snapshot_age;
 pg_lake_iceberg.max_snapshot_age 
----------------------------------
 1800
(1 row)

現状では更新頻度が高いIcebergテーブルの場合、自動VACUUMを無効化して、運用スケジュールに合わせて手動でVACUUMコマンドを実行するのが良さそうです。 また、VACUUMコマンドでパーティションの指定も出来ないのでVACUUMする範囲とテーブルの更新範囲が被るとVACUUMまたはクエリが失敗する可能性が出てくるので、自動VACUUMの頻度やクエリのリトライなどの考慮する必要があります。

では、次にハンズオンで実際に触ってみます。

ハンズオン

環境説明

オブジェクトストレージには、AWS S3互換の”さくらのオブジェクトストレージ”を利用します。

pg_lakeの実行環境はDocker Composeで構築します。 pg_lakeへ外から参照する環境として、Docker Composeの別プロジェクトを用意し、SparkとTrinoコンテナを起動します。

pg_lake環境の構築

pg_lakeのDockerイメージは公式から提供されていないので、自分でビルドします。 公式ドキュメント docker/LOCAL_DEV.md を参考にすればOKです。

TaskというTask Runnerを使う前提になっているので、まずはTaskをインストールしてください5

pg_lakeのイメージのビルド

pg_lakeのリポジトリをcloneし、docker/ まで移動します。 Taskをインストールした後に、task build:local を実行してコンテナイメージをビルドします。

pg_lakeの外から接続出来るようにする

pg_lakeコンテナの外から接続出来るように改変します。

pg_lake/docker/scripts/entrypoint-postgres.sh を以下のように変更します。

やっていることは pg_hba.conf で外からの接続を許可し、postgresql.conflisten_addresses='*' に変更しています。また、今回は横着して外から繋ぐ際のユーザとしてpostgresユーザを使うので、 postgres ユーザのパスワードを .envPOSTGRES_PASSWORDを使うようにしています6

▶ 🗒️ pg_lake/docker/scripts/entrypoint-postgres.shを表示

#!/bin/bash

set -euo pipefail

trap "echo 'Caught termination signal. Exiting...'; exit 0" SIGINT SIGTERM

# Ensure PGBASEDIR and PG_MAJOR are set
PGBASEDIR=${PGBASEDIR:-/home/postgres}
PG_MAJOR=${PG_MAJOR:-18}

# Create and fix permissions for temporary directory BEFORE starting PostgreSQL
# Docker volumes are created with root ownership, but postgres user needs write access
mkdir -p ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/base/pgsql_tmp
sudo chown -R postgres:postgres ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/base/pgsql_tmp
sudo chmod 700 ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/base/pgsql_tmp

# Update pg_hba.conf
# 外からつなぐようにオープンにする
# ローカルはpeer(コンテナ内でpostgresユーザなら無パスワードでOK)、
# それ以外はmd5/scramでパスワード必須
# サンプル用(ザル設定)
cat >"${PGBASEDIR}/pgsql-${PG_MAJOR}/data/pg_hba.conf" <<'HBA'
local   all             postgres                                peer
local   all             all                                     md5
host    all             all             127.0.0.1/32            md5
host    all             all             ::1/128                 md5
host    all             all             0.0.0.0/0               md5
HBA

# Update postgresql.conf
# !!IMPORTANT!!: NOT RECOMMENDED FOR PRODUCTION
# ALLOW ACCESS FROM ANY IP ADDRESS typically used for development to access the database from outside the container
echo "listen_addresses = '*'" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf
echo "port = 5432" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf
echo "shared_preload_libraries = 'pg_extension_base'" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf
echo "pg_lake_iceberg.default_location_prefix = 's3://yassan-test/pg_lake/'" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf
echo "pg_lake_engine.host = 'host=${PGBASEDIR}/pgduck_socket_dir port=5332'" | tee -a ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/postgresql.conf

# Start PostgreSQL server using explicit path
${PGBASEDIR}/pgsql-${PG_MAJOR}/bin/pg_ctl -D ${PGBASEDIR}/pgsql-${PG_MAJOR}/data start -l ${PGBASEDIR}/pgsql-${PG_MAJOR}/data/logfile

# --- postgres のパスワードを設定。.envのPOSTGRES_PASSWORDを使う ---
if [ -n "${POSTGRES_PASSWORD:-}" ]; then
    PW_ESCAPED=${POSTGRES_PASSWORD//\'/\'\'}
    psql -U postgres -v ON_ERROR_STOP=1 -c \
        "ALTER ROLE postgres WITH PASSWORD '${PW_ESCAPED}';"
fi

# Run initialization script using explicit path
${PGBASEDIR}/pgsql-${PG_MAJOR}/bin/psql -U postgres -f /init-postgres.sql

sleep infinity

.envPOSTGRES_PASSWORDを使うようにしたので、pg_lake/docker/.env を以下のように変更します。

PG_LAKE_REF=main
PG_MAJOR=18
POSTGRES_PASSWORD=StrongPassword_here

オブジェクトストレージをlocalstackからさくらのオブジェクトストレージに変更する

pg_lakeのDocker Composeファイルでは、オブジェクトストレージとしてlocalstackを使うようになっていますが、今回はさくらのオブジェクトストレージを使うように変更します7

pg_lake/docker/scripts/init-pgduck-server.sql を以下のように変更します。
こうすることでpg_lake起動時に、pgduck_serverが利用するオブジェクトストレージ用のS3シークレットが作成されます。

-- create s3 secret for sakura object storage
CREATE OR REPLACE SECRET sakura_s3 (
  TYPE s3,
  PROVIDER config,
  SCOPE 's3://yassan-test',
  USE_SSL true,
  KEY_ID '<アクセスキーID>',
  SECRET '<シークレットアクセスキー>',
  URL_STYLE 'vhost',   -- path-style じゃなくても良いよ
  ENDPOINT 's3.isk01.sakurastorage.jp',
  REGION 'jp-north-1'
);

⚠️実運用ではアクセスキーをSQLに直書きせず、環境変数経由で渡す・最小権限のキーを使用するなどの対策を推奨します8

Docker Composeファイルの変更

上記を踏まえて、Docker Composeファイルを以下のように変更します。

オブジェクトストレージにはさくらのクラウドのオブジェクトストレージを使うので、localstackのサービス定義をコメントアウトしています。

▶ 🗒️ 変更したdocker/docker-compose.ymlを表示

# https://knowledge.sakura.ad.jp/48152/#i-2
x-sakura-s3-workaround: &sakura_s3_workaround
  AWS_REQUEST_CHECKSUM_CALCULATION: "WHEN_REQUIRED"
  AWS_RESPONSE_CHECKSUM_VALIDATION: "WHEN_REQUIRED"

# pg_lakeの定義
services:
  pg_lake-postgres:
    image: pg_lake:local
    pull_policy: if_not_present
    container_name: pg_lake
    ports:
      - "5432:5432"
    volumes:
      - ./scripts/entrypoint-postgres.sh:/entrypoint-postgres.sh
      - ./scripts/init-postgres.sql:/init-postgres.sql
      - pgduck-unix-socket-volume:/home/postgres/pgduck_socket_dir
      - pg-shared-tmp-dir-volume:/home/postgres/pgsql-${PG_MAJOR:-18}/data/base/pgsql_tmp
    entrypoint: ["/entrypoint-postgres.sh"]
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "psql", "-c", "create table test(a int) using iceberg; drop table test;"]
      interval: 6s
      timeout: 2s
      start_period: 20s
      retries: 3
    env_file:
      - .env
    cap_add:
      - SYS_PTRACE
    depends_on:
      - pgduck-server

  pgduck-server:
    image: pgduck-server:local
    pull_policy: if_not_present
    container_name: pgduck-server
    # NOTE: pgduck-server only listens on Unix sockets, not TCP
    # To access DuckDB from the host, connect via pg_lake-postgres container which shares the Unix socket
    # Example: psql -h localhost -p 5432 -U postgres -c "SELECT * FROM duckdb_fdw.some_table"
    # IMPORTANT: Both containers must share the same temp directory volume so pgduck_server can access temp files
    volumes:
      - ./scripts/entrypoint-pgduck-server.sh:/entrypoint-pgduck-server.sh
      - ./scripts/init-pgduck-server.sql:/init-pgduck-server.sql
      - pgduck-unix-socket-volume:/home/postgres/pgduck_socket_dir
      - pg-shared-tmp-dir-volume:/home/postgres/pgsql-${PG_MAJOR:-18}/data/base/pgsql_tmp
    entrypoint: ["/entrypoint-pgduck-server.sh"]
    restart: unless-stopped
    environment:
      <<: *sakura_s3_workaround
    healthcheck:
      test: ["CMD", "psql", "-p", "5332", "-h", "/home/postgres/pgduck_socket_dir", "-c", "SELECT 1;"]
      interval: 6s
      timeout: 2s
      start_period: 20s
      retries: 3
    cap_add:
      - SYS_PTRACE
#    depends_on:
#      - localstack

#  localstack:
#    container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}"
#    image: localstack/localstack
#    pull_policy: if_not_present
#    ports:
#      - "127.0.0.1:4566:4566"            # LocalStack Gateway
#      - "127.0.0.1:4510-4559:4510-4559"  # external services port range
#    environment:
#      # LocalStack configuration: https://docs.localstack.cloud/references/configuration/
#      - DEBUG=${DEBUG:-0}
#    volumes:
#      - "${LOCALSTACK_VOLUME_DIR:-./.volume}:/var/lib/localstack"
#      - "${PWD}/scripts/init-s3.sh:/etc/localstack/init/ready.d/init-s3.sh" 
#
volumes:
  pgduck-unix-socket-volume:
  pg-shared-tmp-dir-volume:  # Shared temp directory for both PostgreSQL and pgduck_server

後は、 task compose:up を実行すればpg_lake環境が起動します。

ケース0:Icebergテーブルを作成してみる

毎度おなじみ TLC Trip Record Data - TLC を使ってIcebergテーブルを作ってみます。

-- pg_lakeコンテナに入る
$ docker compose exec pg_lake-postgres bash
[postgres@52d96115560f ~]$ psql
psql (18.0)
Type "help" for help.

postgres=# \d
               List of relations
 Schema |       Name        | Type  |  Owner   
--------+-------------------+-------+----------
 public | geography_columns | view  | postgres
 public | geometry_columns  | view  | postgres
 public | spatial_ref_sys   | table | postgres
(3 rows)

-- Icebergテーブルを作成
-- スキーマ情報はParquetファイルから推測させ、初期データはParquetファイルを取り込む
postgres=# CREATE TABLE trips_yellow ()
USING iceberg
WITH (load_from = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet')
;
CREATE TABLE

-- Icebergテーブルが作成されたことを確認(外部テーブルとして作成されていた)
postgres=# \d+
                                              List of relations
 Schema |       Name        |     Type      |  Owner   | Persistence | Access method |  Size   | Description 
--------+-------------------+---------------+----------+-------------+---------------+---------+-------------
 public | geography_columns | view          | postgres | permanent   |               | 0 bytes | 
 public | geometry_columns  | view          | postgres | permanent   |               | 0 bytes | 
 public | spatial_ref_sys   | table         | postgres | permanent   | heap          | 6936 kB | 
 public | trips_yellow      | foreign table | postgres | permanent   |               | 68 MB   | 
(4 rows)

-- Icebergテーブルの中身を確認
postgres=# SELECT * FROM trips_yellow LIMIT 2 ;
 vendorid | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | ratecodeid | store_and_fwd_flag | pulocationid | dolocationid | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | airport_fee | cbd_congestion_fee 
----------+----------------------+-----------------------+-----------------+---------------+------------+--------------------+--------------+--------------+--------------+-------------+-------+---------+------------+--------------+-----------------------+--------------+----------------------+-------------+--------------------
        2 | 2025-01-17 18:08:04  | 2025-01-17 18:19:55   |                 |             0 |            |                    |          161 |          186 |            0 |       13.22 |     0 |     0.5 |          0 |            0 |                     1 |        17.97 |                      |             |               0.75
        2 | 2025-01-17 18:16:20  | 2025-01-17 18:33:22   |                 |          0.01 |            |                    |          234 |          143 |            0 |       21.09 |     0 |     0.5 |          0 |            0 |                     1 |        25.84 |                      |             |               0.75
(2 rows)

-- Icebergテーブルの行数を確認
postgres=# SELECT count(1) FROM trips_yellow ;
  count  
---------
 3475226
(1 row)

-- Icebergテーブルにさらにデータを追加で読み込んでみる(S3バケット指定してもOK)
postgres=# COPY trips_yellow FROM 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet';
COPY 3577543

-- 行数が増えていることを確認
postgres=# SELECT count(1) FROM trips_yellow ;
  count  
---------
 7052769
(1 row)

-- Icebergテーブルの詳細を確認
postgres=# \d trips_yellow
                                Foreign table "public.trips_yellow"
        Column         |            Type             | Collation | Nullable | Default | FDW options 
-----------------------+-----------------------------+-----------+----------+---------+-------------
 vendorid              | integer                     |           |          |         | 
 tpep_pickup_datetime  | timestamp without time zone |           |          |         | 
 tpep_dropoff_datetime | timestamp without time zone |           |          |         | 
 passenger_count       | bigint                      |           |          |         | 
 trip_distance         | double precision            |           |          |         | 
 ratecodeid            | bigint                      |           |          |         | 
 store_and_fwd_flag    | text                        |           |          |         | 
 pulocationid          | integer                     |           |          |         | 
 dolocationid          | integer                     |           |          |         | 
 payment_type          | bigint                      |           |          |         | 
 fare_amount           | double precision            |           |          |         | 
 extra                 | double precision            |           |          |         | 
 mta_tax               | double precision            |           |          |         | 
 tip_amount            | double precision            |           |          |         | 
 tolls_amount          | double precision            |           |          |         | 
 improvement_surcharge | double precision            |           |          |         | 
 total_amount          | double precision            |           |          |         | 
 congestion_surcharge  | double precision            |           |          |         | 
 airport_fee           | double precision            |           |          |         | 
 cbd_congestion_fee    | double precision            |           |          |         | 
Server: pg_lake_iceberg
FDW options: (catalog 'postgres', location 's3://yassan-test/pg_lake/postgres/public/trips_yellow/18747')

postgres=# 

ケース1:PostgreSQLとして内部的にIcebergテーブルを参照

このケースでは、pg_lakeにJDBCでPostgreSQLとして接続し、pg_lake内のIcebergテーブルをSQLクエリしてみます。

クライアントのDocker Composeファイルは以下のようになります9

# https://knowledge.sakura.ad.jp/48152/#i-2
x-sakura-s3: &sakura_s3
  AWS_REQUEST_CHECKSUM_CALCULATION: "WHEN_REQUIRED"
  AWS_RESPONSE_CHECKSUM_VALIDATION: "WHEN_REQUIRED"
  BACKET_NAME: ${BACKET_NAME}
  AWS_ACCESS_KEY_ID:  ${AWS_ACCESS_KEY_ID}
  AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
  AWS_REGION: "jp-north-1"

# クライアントの定義
services:
  spark:
    image: docker.io/library/spark:4.0.1-python3
    container_name: spark
    ports:
      - "4040:4040"
    command: ["/bin/bash","-lc","sleep infinity"]
    # /opt/spark/ivy-cache が root:root で sparkユーザで書き込めないので
    user: root
    volumes:
      # spark.jars.packages のcache
      - spark-ivy:/opt/spark/ivy-cache
      # PySparkなどで使うスクリプト
      - ./scripts:/opt/spark/scripts
    environment:
      <<: *sakura_s3
      PATH: "/opt/spark/bin:${PATH}"
      # JDBC 接続先や資格情報は ENV で差し込む
      PGHOST: "host.docker.internal"
      PGPORT: "5432"
      PGDATABASE: "postgres"
      PG_USER: "postgres"
      PG_PASSWORD: "StrongPassword_here"

    restart: unless-stopped

  trino:
    image: trinodb/trino:479
    container_name: trino
    ports:
      - "8080:8080"
    volumes:
      # Trinoのカタログ設定を永続化
      - ./trino/log.properties:/etc/trino/log.properties:ro
      - ./trino/pgl.properties:/etc/trino/catalog/pgl.properties:ro
      - ./trino/pg_iceberg.properties:/etc/trino/catalog/pg_iceberg.properties:ro
    environment:
      <<: *sakura_s3
      CATALOG_MANAGEMENT: static
      # JDBC 接続先や資格情報は ENV で差し込む
      PGHOST: "host.docker.internal"
      PGPORT: "5432"
      PGDATABASE: "postgres"
      PG_USER: "postgres"
      PG_PASSWORD: "StrongPassword_here"

    restart: unless-stopped

volumes:
  spark-ivy:

TrinoからPostgreSQLとしてpg_lakeに接続してクエリ

TrinoからPostgreSQLとしてpg_lakeに接続するためのカタログ設定ファイルは以下のようになります。

▶ 📜 pg_lake_client/trino-catalog/pgl.properties を表示

connector.name=postgresql
connection-url=jdbc:postgresql://${ENV:PGHOST}:${ENV:PGPORT}/${ENV:PGDATABASE}
connection-user=${ENV:PG_USER}
connection-password=${ENV:PG_PASSWORD}

この状態で、ケース0で作成したIcebergテーブルをSQLクエリしてみます。

$ docker compose exec trino trino \
  --output-format=ALIGNED \
  --execute "select * from pgl.public.trips_yellow limit 2"

 vendorid |    tpep_pickup_datetime    |   tpep_dropoff_datetime    | passenger_count | trip_distance | ratecodeid | store_and_fwd_flag | pulocationid | dolocationid | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | airport_fee | cbd_congestion_fee 
----------+----------------------------+----------------------------+-----------------+---------------+------------+--------------------+--------------+--------------+--------------+-------------+-------+---------+------------+--------------+-----------------------+--------------+----------------------+-------------+--------------------
        2 | 2025-01-17 18:08:04.000000 | 2025-01-17 18:19:55.000000 |            NULL |           0.0 |       NULL | NULL               |          161 |          186 |            0 |       13.22 |   0.0 |     0.5 |        0.0 |          0.0 |                   1.0 |        17.97 |                 NULL |        NULL |               0.75 
        2 | 2025-01-17 18:16:20.000000 | 2025-01-17 18:33:22.000000 |            NULL |          0.01 |       NULL | NULL               |          234 |          143 |            0 |       21.09 |   0.0 |     0.5 |        0.0 |          0.0 |                   1.0 |        25.84 |                 NULL |        NULL |               0.75 
(2 rows)

SparkからPostgreSQLとしてpg_lakeに接続してSQLクエリ

Trinoと同様にSparkでもPostgreSQLとしてpg_lakeに接続し、IcebergテーブルをSQLクエリしてみます。

$ docker compose exec spark spark-submit \
  --conf spark.log.level=WARN \
  --conf spark.jars.packages=org.postgresql:postgresql:42.7.8 \
  --conf spark.jars.ivy=/opt/spark/ivy-cache \
  /opt/spark/scripts/pglake_read.py

WARNING: Using incubator modules: jdk.incubator.vector
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /opt/spark/ivy-cache/cache
The jars for the packages stored in: /opt/spark/ivy-cache/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dc8745c5-8cf1-4ecc-97f7-f4fffb30ca0c;1.0
        confs: [default]
        found org.postgresql#postgresql;42.7.8 in central
        found org.checkerframework#checker-qual;3.49.5 in central
:: resolution report :: resolve 111ms :: artifacts dl 3ms
        :: modules in use:
        org.checkerframework#checker-qual;3.49.5 from central in [default]
        org.postgresql#postgresql;42.7.8 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-dc8745c5-8cf1-4ecc-97f7-f4fffb30ca0c
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/5ms)
26/01/03 11:52:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/03 11:52:53 INFO SparkContext: Running Spark version 4.0.1
26/01/03 11:52:53 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64
26/01/03 11:52:53 INFO SparkContext: Java version 17.0.17
Setting Spark log level to "WARN".
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|1       |2025-01-11 23:42:35 |2025-01-11 23:58:44  |1              |2.1          |1         |N                 |163         |48          |1           |13.5       |4.25 |0.5    |3.85      |0.0         |1.0                  |23.1        |2.5                 |0.0        |0.75              |
|1       |2025-01-11 23:50:09 |2025-01-11 23:57:53  |1              |1.0          |1         |N                 |148         |114         |1           |7.9        |4.25 |0.5    |2.75      |0.0         |1.0                  |16.4        |2.5                 |0.0        |0.75              |
|1       |2025-01-11 23:04:15 |2025-01-11 23:13:04  |1              |0.9          |1         |N                 |230         |162         |1           |8.6        |4.25 |0.5    |0.0       |0.0         |1.0                  |14.35       |2.5                 |0.0        |0.75              |
|1       |2025-01-11 23:59:14 |2025-01-12 00:07:42  |1              |1.3          |1         |N                 |114         |107         |1           |9.3        |3.25 |0.5    |2.8       |0.0         |1.0                  |16.85       |2.5                 |0.0        |0.75              |
|1       |2025-01-11 23:48:47 |2025-01-12 00:00:52  |2              |1.7          |1         |N                 |90          |79          |1           |12.1       |4.25 |0.5    |3.55      |0.0         |1.0                  |21.4        |2.5                 |0.0        |0.75              |
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
only showing top 5 rows

また、PySparkのスクリプトは以下のようになります。

▶ 📜 pg_lake_client/scripts/pglake_read.py を表示

import os
from pyspark.sql import SparkSession

# 環境変数から接続先を取得(compose の environment で渡す)
host = os.getenv("PGHOST", "127.0.0.1")
port = os.getenv("PGPORT", "5432")
db   = os.getenv("PGDATABASE", "postgres")
user = os.getenv("PG_USER", "postgres")
pw   = os.getenv("PG_PASSWORD", "")

url = f"jdbc:postgresql://{host}:{port}/{db}"

props = {
    "user": user,
    "password": pw,
    "driver": "org.postgresql.Driver",
    "fetchsize": "1000",
}

spark = SparkSession.builder.appName("pg_jdbc_read").getOrCreate()

table_or_query = "(SELECT * FROM public.trips_yellow ORDER BY 1 LIMIT 10000) t"

df = spark.read.jdbc(url=url, table=table_or_query, properties=props)
df.show(5, truncate=False)

spark.stop()

ケース2:Icebergカタログ(JDBC)としてSparkやTrinoからクエリ

このケースでは「PostgreSQLとして接続する」のではなく、pg_lake を Icebergのカタログ実装として扱う 検証します。

SparkからIcebergカタログとしてpg_lakeに接続してクエリ

$ export BUCKET_NAME=yassan-test
$ docker compose exec spark spark-submit \
  --conf spark.log.level=WARN \
  --conf spark.jars.packages=org.postgresql:postgresql:42.7.8,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \
  --conf spark.jars.ivy=/opt/spark/ivy-cache \
  --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.defaultCatalog=postgres \
  --conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.postgres.type=jdbc \
  --conf spark.sql.catalog.postgres.uri=jdbc:postgresql://host.docker.internal:5432/postgres \
  --conf spark.sql.catalog.postgres.jdbc.user=postgres \
  --conf spark.sql.catalog.postgres.jdbc.password=StrongPassword_here \
  --conf spark.sql.catalog.postgres.jdbc.verifyServerCertificate=false \
  --conf spark.sql.catalog.postgres.jdbc.useSSL=false \
  --conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \
  --conf spark.sql.catalog.postgres.warehouse=s3://${BUCKET_NAME}/pg_lake \
  --conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
  --conf spark.sql.catalog.postgres.s3.endpoint=https://s3.isk01.sakurastorage.jp \
  /opt/spark/scripts/rest_read.py
WARNING: Using incubator modules: jdk.incubator.vector
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /opt/spark/ivy-cache/cache
The jars for the packages stored in: /opt/spark/ivy-cache/jars
org.postgresql#postgresql added as a dependency
org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-23e5b16c-74b8-4cae-8340-ecbbe0a7817f;1.0
        confs: [default]
        found org.postgresql#postgresql;42.7.8 in central
        found org.checkerframework#checker-qual;3.49.5 in central
        found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 in central
        found org.apache.iceberg#iceberg-aws-bundle;1.10.1 in central
:: resolution report :: resolve 175ms :: artifacts dl 9ms
        :: modules in use:
        org.apache.iceberg#iceberg-aws-bundle;1.10.1 from central in [default]
        org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 from central in [default]
        org.checkerframework#checker-qual;3.49.5 from central in [default]
        org.postgresql#postgresql;42.7.8 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-23e5b16c-74b8-4cae-8340-ecbbe0a7817f
        confs: [default]
        0 artifacts copied, 4 already retrieved (0kB/8ms)
26/02/01 00:41:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/01 00:41:51 INFO SparkContext: Running Spark version 4.0.1
26/02/01 00:41:51 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64
26/02/01 00:41:51 INFO SparkContext: Java version 17.0.17
Setting Spark log level to "WARN".
26/02/01 00:41:56 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|2       |2025-01-17 18:08:04 |2025-01-17 18:19:55  |NULL           |0.0          |NULL      |NULL              |161         |186         |0           |13.22      |0.0  |0.5    |0.0       |0.0         |1.0                  |17.97       |NULL                |NULL       |0.75              |
|2       |2025-01-17 18:16:20 |2025-01-17 18:33:22  |NULL           |0.01         |NULL      |NULL              |234         |143         |0           |21.09      |0.0  |0.5    |0.0       |0.0         |1.0                  |25.84       |NULL                |NULL       |0.75              |
|1       |2025-01-17 18:01:26 |2025-01-17 18:22:49  |NULL           |6.7          |NULL      |NULL              |231         |141         |0           |25.7       |0.0  |0.5    |0.0       |0.0         |1.0                  |30.45       |NULL                |NULL       |0.75              |
|2       |2025-01-17 18:51:18 |2025-01-17 19:11:39  |NULL           |0.0          |NULL      |NULL              |234         |239         |0           |-4.75      |0.0  |0.5    |0.0       |0.0         |1.0                  |6.37        |NULL                |NULL       |0.75              |
|2       |2025-01-17 18:14:51 |2025-01-17 18:25:51  |NULL           |0.0          |NULL      |NULL              |193         |179         |0           |10.04      |0.0  |0.5    |0.0       |0.0         |1.0                  |11.54       |NULL                |NULL       |0.0               |
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
only showing top 5 rows

一応、読めていますが、、JDBCカタログのスキーマバージョンがV0のままなので、以下のWARNが出ています。

26/02/01 00:41:56 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1

↑の実行コマンドにて、以下を足しています。

--conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \

V0 → V1 に変更して再実行すると、メタデータテーブルのスキーマが更新され、WARNも出なくなるはずなのですが、、
以下の様に自動マイグレーションを実行しようとしてエラーになります。

Setting Spark log level to "WARN".
Traceback (most recent call last):
  File "/opt/spark/scripts/rest_read.py", line 10, in <module>
    df = spark.read.format("iceberg").load("postgres.public.trips_yellow")
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 311, in load
  File "/opt/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 282, in deco
  File "/opt/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/protocol.py", line 327, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o48.load.
: org.apache.iceberg.jdbc.UncheckedSQLException: Cannot check and eventually update SQL schema
  ()
Caused by: org.postgresql.util.PSQLException: ERROR: ALTER action ADD COLUMN cannot be performed on relation "iceberg_tables"
  Detail: This operation is not supported for views.
  ()

PostgreSQL側でALTER TABLEができないのは、pg_lakeの pg_lake_iceberg 拡張が作成する iceberg_tables がビューとして扱われているのが原因のようです。その為、pg_lake以外のクエリエンジン(SparkやTrino)からJDBCカタログとして接続する場合は、スキーマバージョンをV0のまま使う必要があります10

したがって、少なくともこの組合せでは “Iceberg JDBC Catalog としての運用” は難しいです。

▶ 🗒️ pg_lake-postgresのPostgreSQLを見た際のログを表示

$ docker compose exec pg_lake-postgres bash
[postgres@ea3039d1b599 ~]$ psql
psql (18.1)
Type "help" for help.

postgres=# copy trips_yellow from 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet';
COPY 3577543
postgres=# SELECT
    table_schema,
    table_name,
    table_type
FROM information_schema.tables
WHERE table_schema = 'lake_iceberg'
  AND table_name = 'tables';
 table_schema | table_name | table_type 
--------------+------------+------------
 lake_iceberg | tables     | VIEW
(1 row)

postgres=# SELECT
    definition
FROM pg_views
WHERE schemaname = 'lake_iceberg'
  AND viewname = 'tables';
                                       definition                                        
-----------------------------------------------------------------------------------------
  SELECT (current_database())::text AS catalog_name,                                    +
     ((pg_class.relnamespace)::regnamespace)::character varying(255) AS table_namespace,+
     (pg_class.relname)::character varying(255) AS table_name,                          +
     tables_internal.metadata_location,                                                 +
     tables_internal.previous_metadata_location                                         +
    FROM (lake_iceberg.tables_internal                                                  +
      JOIN pg_class ON ((pg_class.oid = (tables_internal.table_name)::oid)))            +
 UNION ALL                                                                              +
  SELECT tables_external.catalog_name,                                                  +
     tables_external.table_namespace,                                                   +
     tables_external.table_name,                                                        +
     tables_external.metadata_location,                                                 +
     tables_external.previous_metadata_location                                         +
    FROM lake_iceberg.tables_external                                                   +
   WHERE ((tables_external.catalog_name)::text <> (current_database())::text);
(1 row)

postgres=# SELECT *
FROM lake_iceberg.tables
ORDER BY table_namespace, table_name;
 catalog_name | table_namespace |  table_name  |                                                      metadata_location                                                        |                                                 previous_metadata_location                                                    
--------------+-----------------+--------------+-------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------
 postgres     | public          | trips_yellow | s3://yassan-test/pg_lake/postgres/public/trips_yellow/18635/metadata/00002-505466b5-51e0-4282-9827-2d7cbaabb941.metadata.json | s3://yassan-test/pg_lake/postgres/public/trips_yellow/18635/metadata/00001-32f3d086-e001-464b-82d3-b4bbc313bdd5.metadata.json
(1 row)

pg_lakeのIcebergテーブルのドキュメントの Accessing Iceberg tables with Spark を見てると、org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1 で確認していてかなり古い。 JDBCカタログが使うDBスキーマが v1.6.0 で V0/V1の概念が導入され、V1でView対応していたりするので、そこが影響してそうです。

TrinoからIcebergカタログとしてpg_lakeに接続してクエリ

TrinoからIcebergカタログとしてpg_lakeに接続するためのコネクターの設定ファイルは以下のようになります。

▶ 📜 pg_lake_client/trino-catalog/pg_iceberg.properties を表示

connector.name=iceberg
iceberg.catalog.type=jdbc

iceberg.jdbc-catalog.catalog-name=postgres
iceberg.jdbc-catalog.driver-class=org.postgresql.Driver
iceberg.jdbc-catalog.connection-url=jdbc:postgresql://${ENV:PGHOST}:${ENV:PGPORT}/${ENV:PGDATABASE}
iceberg.jdbc-catalog.connection-user=${ENV:PG_USER}
iceberg.jdbc-catalog.connection-password=${ENV:PG_PASSWORD}
iceberg.jdbc-catalog.schema-version=V0
iceberg.jdbc-catalog.default-warehouse-dir=s3://${ENV:BUCKET_NAME}/pg_lake

fs.native-s3.enabled=true
s3.region=${ENV:AWS_REGION}
s3.endpoint=https://s3.isk01.sakurastorage.jp
s3.aws-access-key=${ENV:AWS_ACCESS_KEY_ID}
s3.aws-secret-key=${ENV:AWS_SECRET_ACCESS_KEY}

この状態で、ケース0で作成したIcebergテーブルをSQLクエリしてみます。

$ docker compose exec trino trino \
  --output-format=ALIGNED \
  --execute "select * from pg_iceberg.public.trips_yellow limit 2"

Query 20260201_010042_00000_etnxt failed: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1

以下の様にJDBCカタログのスキーマバージョンがV0を指定していますがエラーになります。

iceberg.jdbc-catalog.schema-version=V0

Trinoのログを確認すると、以下のWARNが出ていました。

trino  | 2026-02-01T01:00:42.833Z       WARN    Query-20260201_010042_00000_etnxt-162   org.apache.iceberg.jdbc.JdbcCatalog     JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
trino  | 2026-02-01T01:00:42.862Z       INFO    dispatcher-query-4      io.trino.event.QueryMonitor     TIMELINE: Query 20260201_010042_00000_etnxt :: FAILED (GENERIC_INTERNAL_ERROR) :: elapsed 405ms :: planning 0ms :: waiting 399ms :: scheduling 0ms :: running 0ms :: finishing 0ms :: begin 2026-02-01T01:00:42.442Z :: end 2026-02-01T01:00:42.847Z

Trinoの起動ログにも V0 の設定が反映されているようです

trino  | 2026-02-01T01:00:15.210Z       INFO    main    io.trino.bootstrap.catalog.pg_iceberg   iceberg.jdbc-catalog.schema-version               V1                V0                                 JDBC catalog schema version

まとめると、試したことから分かったことは以下の通り。

  • iceberg.jdbc-catalog.schema-version=V0 を設定しても、実行時に V1 相当の振る舞い(View対応前提)を求めるエラーとなった
  • 起動ログ上では V0 が設定値として表示されている

このことから以下と考えられる。

  • Trino側の Iceberg JDBC catalog 実装が実行時に V1 相当のスキーマを前提としている可能性
  • あるいは pg_lake 側の JDBC 互換メタ実装が Iceberg 側の期待するDDL変更に対応していない可能性

結論:

現状11では、SparkやTrinoからIcebergカタログとしてpg_lakeを扱うのは止めたほうが良さそうです。
そもそも、あえてIcebergカタログとして接続しなくても、利用しやすいPostgreSQL接続で十分だった。

追記(2026-02-23):

Trinoでエラーになる件は、以下の通り、Trino側のバグでした。これがマージされれば回避できそうです。

github.com

ケース3:S3上のファイルを直接参照したり、テーブルをS3に吐く

ここでは前提として s3://yassan-test/data-yellow_trip/ 以下に yellow_tripdata_YYYY-MM.parquetのParquetファイルがあるとします。

バケット上にあるParquetファイルをテーブルとして参照する

# pg_lakeコンテナに入る
$ docker compose exec pg_lake-postgres bash
[postgres@52d96115560f ~]$ psql

# 現状の確認
postgres=# \d
                   List of relations
 Schema |       Name        |     Type      |  Owner   
--------+-------------------+---------------+----------
 public | geography_columns | view          | postgres
 public | geometry_columns  | view          | postgres
 public | spatial_ref_sys   | table         | postgres
 public | trips_yellow      | foreign table | postgres
(4 rows)

# `s3://yassan-test/data-yellow_trip/` を参照する外部テーブルを作成
postgres=# CREATE FOREIGN TABLE pq_tbl_yellow_tripdata() 
SERVER pg_lake 
OPTIONS (path 's3://yassan-test/data-yellow_trip/*.parquet')
;
CREATE FOREIGN TABLE

# 外部テーブルの存在を確認
postgres=# \d
                     List of relations
 Schema |          Name          |     Type      |  Owner   
--------+------------------------+---------------+----------
 public | geography_columns      | view          | postgres
 public | geometry_columns       | view          | postgres
 public | pq_tbl_yellow_tripdata | foreign table | postgres
 public | spatial_ref_sys        | table         | postgres
 public | trips_yellow           | foreign table | postgres
(5 rows)

# 作成した外部テーブルの定義を確認するとParquetのメタデータを元に型を推測している事が分かる
postgres=# \d pq_tbl_yellow_tripdata
                           Foreign table "public.pq_tbl_yellow_tripdata"
        Column         |            Type             | Collation | Nullable | Default | FDW options 
-----------------------+-----------------------------+-----------+----------+---------+-------------
 vendorid              | integer                     |           |          |         | 
 tpep_pickup_datetime  | timestamp without time zone |           |          |         | 
 tpep_dropoff_datetime | timestamp without time zone |           |          |         | 
 passenger_count       | bigint                      |           |          |         | 
 trip_distance         | double precision            |           |          |         | 
 ratecodeid            | bigint                      |           |          |         | 
 store_and_fwd_flag    | text                        |           |          |         | 
 pulocationid          | integer                     |           |          |         | 
 dolocationid          | integer                     |           |          |         | 
 payment_type          | bigint                      |           |          |         | 
 fare_amount           | double precision            |           |          |         | 
 extra                 | double precision            |           |          |         | 
 mta_tax               | double precision            |           |          |         | 
 tip_amount            | double precision            |           |          |         | 
 tolls_amount          | double precision            |           |          |         | 
 improvement_surcharge | double precision            |           |          |         | 
 total_amount          | double precision            |           |          |         | 
 congestion_surcharge  | double precision            |           |          |         | 
 airport_fee           | double precision            |           |          |         | 
Server: pg_lake
FDW options: (path 's3://yassan-test/data-yellow_trip/*.parquet')

適当にクエリしてみます。

postgres=# select count(*) from pq_tbl_yellow_tripdata ;
  count   
----------
 81405872
(1 row)

postgres=# SELECT
  date_trunc('month', tpep_pickup_datetime) AS month,
  count(*) AS trips,
  sum(total_amount)::numeric(18,2) AS total_sales,
  avg(trip_distance) AS avg_distance
FROM pq_tbl_yellow_tripdata
WHERE tpep_pickup_datetime >= timestamp '2024-01-01'
  AND tpep_pickup_datetime <  timestamp '2025-11-01'
GROUP BY 1
ORDER BY 1;
        month        |  trips  | total_sales  |   avg_distance    
---------------------+---------+--------------+-------------------
 2024-01-01 00:00:00 | 2964617 |  79456198.67 | 3.652168742201813
 2024-02-01 00:00:00 | 3007533 |  80073797.21 | 3.860857653099632
 2024-03-01 00:00:00 | 3582611 |  97162539.11 | 4.517420758212249
 2024-04-01 00:00:00 | 3514295 |  96620076.26 | 5.283848157311692
 2024-05-01 00:00:00 | 3723843 | 105662491.74 | 5.367007577924154
 2024-06-01 00:00:00 | 3539170 |  98854290.75 | 5.222029280876485
 2024-07-01 00:00:00 | 3076876 |  86417596.46 | 5.111791086153442
 2024-08-01 00:00:00 | 2979192 |  84225308.56 | 4.944860284264796
 2024-09-01 00:00:00 | 3633025 | 103683171.72 |  5.74147173223362
 2024-10-01 00:00:00 | 3833780 | 108992892.43 | 5.123551205337354
 2024-11-01 00:00:00 | 3646372 | 100829501.83 | 5.369536251374905
 2024-12-01 00:00:00 | 3668371 | 103889576.09 | 5.087793451098265
 2025-01-01 00:00:00 | 3475236 |  89005350.57 | 5.855122296729696
 2025-02-01 00:00:00 | 3577543 |  89547719.11 | 6.025345118143267
 2025-03-01 00:00:00 | 4145231 | 108878221.10 | 6.584121432556175
 2025-04-01 00:00:00 | 3970568 | 105596876.87 | 6.994721848359639
 2025-05-01 00:00:00 | 4591844 | 123430419.10 | 7.653431418836262
 2025-06-01 00:00:00 | 4322949 | 118393545.05 | 7.478367660594344
 2025-07-01 00:00:00 | 3898971 | 104599459.73 | 7.104746062485573
 2025-08-01 00:00:00 | 3574080 |  94315391.11 |  7.08938401770422
 2025-09-01 00:00:00 | 4251019 | 117809613.95 | 6.839372030563955
 2025-10-01 00:00:00 | 4428687 | 119457540.64 | 6.696755654665819
(22 rows)

上記のようにバケットにあるParquetファイルをそのままにクエリ出来ていることが分かります。

ParquetファイルをIcebergテーブルにImportする

s3://yassan-test/data-yellow_trip/*.parquet をIcebergテーブルにImportしてみます。

-- コピー先のIcebergテーブルを作成
postgres=# CREATE TABLE iceberg_yellow_tripdata
(LIKE pq_tbl_yellow_tripdata)
USING iceberg;
CREATE TABLE

postgres=# \d iceberg_yellow_tripdata
                           Foreign table "public.iceberg_yellow_tripdata"
        Column         |            Type             | Collation | Nullable | Default | FDW options 
-----------------------+-----------------------------+-----------+----------+---------+-------------
 vendorid              | integer                     |           |          |         | 
 tpep_pickup_datetime  | timestamp without time zone |           |          |         | 
 tpep_dropoff_datetime | timestamp without time zone |           |          |         | 
 passenger_count       | bigint                      |           |          |         | 
 trip_distance         | double precision            |           |          |         | 
 ratecodeid            | bigint                      |           |          |         | 
 store_and_fwd_flag    | text                        |           |          |         | 
 pulocationid          | integer                     |           |          |         | 
 dolocationid          | integer                     |           |          |         | 
 payment_type          | bigint                      |           |          |         | 
 fare_amount           | double precision            |           |          |         | 
 extra                 | double precision            |           |          |         | 
 mta_tax               | double precision            |           |          |         | 
 tip_amount            | double precision            |           |          |         | 
 tolls_amount          | double precision            |           |          |         | 
 improvement_surcharge | double precision            |           |          |         | 
 total_amount          | double precision            |           |          |         | 
 congestion_surcharge  | double precision            |           |          |         | 
 airport_fee           | double precision            |           |          |         | 
Server: pg_lake_iceberg
FDW options: (catalog 'postgres', location 's3://yassan-test/pg_lake/postgres/public/iceberg_yellow_tripdata/28585')


-- Parquetファイルを参照している外部テーブルを作成したIcebergテーブルにCOPYする
postgres=# COPY iceberg_yellow_tripdata FROM 's3://yassan-test/data-yellow_trip/*.parquet';
COPY 81405872

postgres=# select count(*) from iceberg_yellow_tripdata ;
  count   
----------
 81405872
(1 row)

-- 同じクエリしてみる
postgres=# SELECT
  date_trunc('month', tpep_pickup_datetime) AS month,
  count(*) AS trips,
  sum(total_amount)::numeric(18,2) AS total_sales,
  avg(trip_distance) AS avg_distance
FROM iceberg_yellow_tripdata
WHERE tpep_pickup_datetime >= timestamp '2024-01-01'
  AND tpep_pickup_datetime <  timestamp '2025-11-01'
GROUP BY 1
ORDER BY 1;
        month        |  trips  | total_sales  |   avg_distance    
---------------------+---------+--------------+-------------------
 2024-01-01 00:00:00 | 2964617 |  79456198.67 | 3.652168742201802
 2024-02-01 00:00:00 | 3007533 |  80073797.21 | 3.860857653099784
 2024-03-01 00:00:00 | 3582611 |  97162539.11 | 4.517420758212385
 2024-04-01 00:00:00 | 3514295 |  96620076.26 | 5.283848157311756
 2024-05-01 00:00:00 | 3723843 | 105662491.74 | 5.367007577924266
 2024-06-01 00:00:00 | 3539170 |  98854290.75 | 5.222029280876606
 2024-07-01 00:00:00 | 3076876 |  86417596.46 | 5.111791086153619
 2024-08-01 00:00:00 | 2979192 |  84225308.56 | 4.944860284264946
 2024-09-01 00:00:00 | 3633025 | 103683171.72 | 5.741471732234034
 2024-10-01 00:00:00 | 3833780 | 108992892.43 | 5.123551205337748
 2024-11-01 00:00:00 | 3646372 | 100829501.83 | 5.369536251375254
 2024-12-01 00:00:00 | 3668371 | 103889576.09 | 5.087793451098546
 2025-01-01 00:00:00 | 3475236 |  89005350.57 | 5.855122296729983
 2025-02-01 00:00:00 | 3577543 |  89547719.11 | 6.025345118143695
 2025-03-01 00:00:00 | 4145231 | 108878221.10 | 6.584121432556849
 2025-04-01 00:00:00 | 3970568 | 105596876.87 | 6.994721848359854
 2025-05-01 00:00:00 | 4591844 | 123430419.10 |  7.65343141883677
 2025-06-01 00:00:00 | 4322949 | 118393545.05 |  7.47836766059443
 2025-07-01 00:00:00 | 3898971 | 104599459.73 | 7.104746062486313
 2025-08-01 00:00:00 | 3574080 |  94315391.11 | 7.089384017704826
 2025-09-01 00:00:00 | 4251019 | 117809613.95 | 6.839372030564524
 2025-10-01 00:00:00 | 4428687 | 119457540.64 | 6.696755654666752
(22 rows)

Icebergテーブルのクエリ結果をS3に出力する

次に、集計したクエリをParquetファイルに出力してみます。 以下の通り、COPY (クエリ) TO 's3://bucket/to/prefix/hoge.parquet' で出力出来る事が分かる。

-- 集計クエリをバケットにParquetファイルで出力する
COPY (
  SELECT
    date_trunc('month', tpep_pickup_datetime) AS month,
    count(*) AS trips,
    sum(total_amount)::numeric(18,2) AS total_sales,
    avg(trip_distance) AS avg_distance
  FROM iceberg_yellow_tripdata
  WHERE tpep_pickup_datetime >= timestamp '2024-01-01'
    AND tpep_pickup_datetime <  timestamp '2025-11-01'
  GROUP BY 1
  ORDER BY 1
)
TO 's3://yassan-test/marts/yellow_monthly_202401_202510.parquet';

-- 出来てるか確認する為、出力先を外部参照してクエリしてみる
CREATE FOREIGN TABLE pq_tbl_yellow_monthly() 
SERVER pg_lake 
OPTIONS (path 's3://yassan-test/marts/yellow_monthly_*.parquet')
;

postgres=# \d pq_tbl_yellow_monthly
                       Foreign table "public.pq_tbl_yellow_monthly"
    Column    |            Type             | Collation | Nullable | Default | FDW options 
--------------+-----------------------------+-----------+----------+---------+-------------
 month        | timestamp without time zone |           |          |         | 
 trips        | bigint                      |           |          |         | 
 total_sales  | numeric(18,2)               |           |          |         | 
 avg_distance | double precision            |           |          |         | 
Server: pg_lake
FDW options: (path 's3://yassan-test/marts/yellow_monthly_*.parquet')

postgres=# select count(*) from pq_tbl_yellow_monthly ;
 count 
-------
    22
(1 row)

postgres=# select * from pq_tbl_yellow_monthly ;
        month        |  trips  | total_sales  |   avg_distance    
---------------------+---------+--------------+-------------------
 2024-01-01 00:00:00 | 2964617 |  79456198.67 |  3.65216874220182
 2024-02-01 00:00:00 | 3007533 |  80073797.21 | 3.860857653099766
 2024-03-01 00:00:00 | 3582611 |  97162539.11 | 4.517420758212356
 2024-04-01 00:00:00 | 3514295 |  96620076.26 | 5.283848157311716
 2024-05-01 00:00:00 | 3723843 | 105662491.74 | 5.367007577924155
 2024-06-01 00:00:00 | 3539170 |  98854290.75 |  5.22202928087656
 2024-07-01 00:00:00 | 3076876 |  86417596.46 | 5.111791086153628
 2024-08-01 00:00:00 | 2979192 |  84225308.56 | 4.944860284264978
 2024-09-01 00:00:00 | 3633025 | 103683171.72 | 5.741471732234106
 2024-10-01 00:00:00 | 3833780 | 108992892.43 | 5.123551205337798
 2024-11-01 00:00:00 | 3646372 | 100829501.83 | 5.369536251375229
 2024-12-01 00:00:00 | 3668371 | 103889576.09 | 5.087793451098548
 2025-01-01 00:00:00 | 3475236 |  89005350.57 | 5.855122296730121
 2025-02-01 00:00:00 | 3577543 |  89547719.11 | 6.025345118143695
 2025-03-01 00:00:00 | 4145231 | 108878221.10 | 6.584121432556826
 2025-04-01 00:00:00 | 3970568 | 105596876.87 | 6.994721848359891
 2025-05-01 00:00:00 | 4591844 | 123430419.10 | 7.653431418837025
 2025-06-01 00:00:00 | 4322949 | 118393545.05 | 7.478367660594162
 2025-07-01 00:00:00 | 3898971 | 104599459.73 |  7.10474606248644
 2025-08-01 00:00:00 | 3574080 |  94315391.11 |  7.08938401770497
 2025-09-01 00:00:00 | 4251019 | 117809613.95 | 6.839372030564667
 2025-10-01 00:00:00 | 4428687 | 119457540.64 | 6.696755654666566
(22 rows)

せっかくなので、Icebergテーブル iceberg_yellow_tripdata のメタデータをちょっと覗いてみます。

$ export BUCKET_NAME=yassan-test
$ docker compose exec spark spark-submit \
  --conf spark.log.level=WARN \
  --conf spark.jars.packages=org.postgresql:postgresql:42.7.8,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \
  --conf spark.jars.ivy=/opt/spark/ivy-cache \
  --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.defaultCatalog=postgres \
  --conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.postgres.type=jdbc \
  --conf spark.sql.catalog.postgres.uri=jdbc:postgresql://host.docker.internal:5432/postgres \
  --conf spark.sql.catalog.postgres.jdbc.user=postgres \
  --conf spark.sql.catalog.postgres.jdbc.password=StrongPassword_here \
  --conf spark.sql.catalog.postgres.jdbc.verifyServerCertificate=false \
  --conf spark.sql.catalog.postgres.jdbc.useSSL=false \
  --conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \
  --conf spark.sql.catalog.postgres.warehouse=s3://${BUCKET_NAME}/pg_lake \
  --conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
  --conf spark.sql.catalog.postgres.s3.endpoint=https://s3.isk01.sakurastorage.jp \
  /opt/spark/scripts/check_iceberg_tbl_metadata.py \
  postgres.public.iceberg_yellow_tripdata

▶ 🗒️ postgres.public.iceberg_yellow_tripdata のメタデータの状態を確認した際の実行ログ

$ docker compose exec spark spark-submit \
  --conf spark.log.level=WARN \
  --conf spark.jars.packages=org.postgresql:postgresql:42.7.8,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \
  --conf spark.jars.ivy=/opt/spark/ivy-cache \
  --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.defaultCatalog=postgres \
  --conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.postgres.type=jdbc \
  --conf spark.sql.catalog.postgres.uri=jdbc:postgresql://host.docker.internal:5432/postgres \
  --conf spark.sql.catalog.postgres.jdbc.user=postgres \
  --conf spark.sql.catalog.postgres.jdbc.password=StrongPassword_here \
  --conf spark.sql.catalog.postgres.jdbc.verifyServerCertificate=false \
  --conf spark.sql.catalog.postgres.jdbc.useSSL=false \
  --conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \
  --conf spark.sql.catalog.postgres.warehouse=s3://${BUCKET_NAME}/pg_lake \
  --conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
  --conf spark.sql.catalog.postgres.s3.endpoint=https://s3.isk01.sakurastorage.jp \
  /opt/spark/scripts/check_iceberg_tbl_metadata.py \
  postgres.public.iceberg_yellow_tripdata
WARNING: Using incubator modules: jdk.incubator.vector
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /opt/spark/ivy-cache/cache
The jars for the packages stored in: /opt/spark/ivy-cache/jars
org.postgresql#postgresql added as a dependency
org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3c034c47-bdd7-40a6-9635-977db416a74f;1.0
        confs: [default]
        found org.postgresql#postgresql;42.7.8 in central
        found org.checkerframework#checker-qual;3.49.5 in central
        found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 in central
        found org.apache.iceberg#iceberg-aws-bundle;1.10.1 in central
:: resolution report :: resolve 145ms :: artifacts dl 5ms
        :: modules in use:
        org.apache.iceberg#iceberg-aws-bundle;1.10.1 from central in [default]
        org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 from central in [default]
        org.checkerframework#checker-qual;3.49.5 from central in [default]
        org.postgresql#postgresql;42.7.8 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-3c034c47-bdd7-40a6-9635-977db416a74f
        confs: [default]
        0 artifacts copied, 4 already retrieved (0kB/9ms)
26/02/22 01:01:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/22 01:01:12 INFO SparkContext: Running Spark version 4.0.1
26/02/22 01:01:12 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64
26/02/22 01:01:12 INFO SparkContext: Java version 17.0.17
Setting Spark log level to "WARN".
26/02/22 01:01:17 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
現在のデフォルトカタログ: postgres

==============================================================================================================
[HISTORY]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.history ORDER BY made_current_at DESC

-- 結果 --
+--------------------+-------------------+---------+-------------------+
|     made_current_at|        snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2026-02-21 09:15:...|5403871763539957193|       -1|               true|
+--------------------+-------------------+---------+-------------------+


==============================================================================================================
[SNAPSHOTS]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.snapshots ORDER BY committed_at DESC

-- 結果 --
+--------------------+-------------------+---------+---------+------------------+-------+
|        committed_at|        snapshot_id|parent_id|operation|     manifest_list|summary|
+--------------------+-------------------+---------+---------+------------------+-------+
|2026-02-21 09:15:...|5403871763539957193|       -1|   append|s3://yassan-tes...|     {}|
+--------------------+-------------------+---------+---------+------------------+-------+


==============================================================================================================
[METADATA_LOG_ENTRIES]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.metadata_log_entries ORDER BY timestamp DESC

-- 結果 --
+--------------------+------------------+-------------------+----------------+----------------------+
|           timestamp|              file| latest_snapshot_id|latest_schema_id|latest_sequence_number|
+--------------------+------------------+-------------------+----------------+----------------------+
|2026-02-21 09:15:...|s3://yassan-tes...|5403871763539957193|               0|                     1|
|2026-02-21 09:12:...|s3://yassan-tes...|               NULL|            NULL|                  NULL|
+--------------------+------------------+-------------------+----------------+----------------------+


==============================================================================================================
[MANIFESTS]
SELECT content, path, length, partition_spec_id, added_snapshot_id FROM postgres.public.iceberg_yellow_tripdata.manifests ORDER BY added_snapshot_id DESC

-- 結果 --
+-------+------------------+------+-----------------+-------------------+
|content|              path|length|partition_spec_id|  added_snapshot_id|
+-------+------------------+------+-----------------+-------------------+
|      0|s3://yassan-tes...|  8691|                0|5403871763539957193|
+-------+------------------+------+-----------------+-------------------+


==============================================================================================================
[FILES (データファイル)]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.files

-- 結果 --
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+
|content|         file_path|file_format|spec_id|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts|        lower_bounds|        upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|first_row_id|referenced_data_file|content_offset|content_size_in_bytes|    readable_metrics|
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+
|      0|s3://yassan-tes...|    PARQUET|      0|    25322861|         518520071|        NULL|        NULL|             NULL|            NULL|{1 -> [01 00 00 0...|{1 -> [07 00 00 0...|        NULL|         NULL|        NULL|         NULL|        NULL|                NULL|          NULL|                 NULL|{{NULL, NULL, NUL...|
|      0|s3://yassan-tes...|    PARQUET|      0|    25328936|         520064704|        NULL|        NULL|             NULL|            NULL|{1 -> [01 00 00 0...|{1 -> [07 00 00 0...|        NULL|         NULL|        NULL|         NULL|        NULL|                NULL|          NULL|                 NULL|{{NULL, NULL, NUL...|
|      0|s3://yassan-tes...|    PARQUET|      0|     4206302|          85638572|        NULL|        NULL|             NULL|            NULL|{1 -> [01 00 00 0...|{1 -> [07 00 00 0...|        NULL|         NULL|        NULL|         NULL|        NULL|                NULL|          NULL|                 NULL|{{NULL, NULL, NUL...|
|      0|s3://yassan-tes...|    PARQUET|      0|    26547773|         546084901|        NULL|        NULL|             NULL|            NULL|{1 -> [01 00 00 0...|{1 -> [06 00 00 0...|        NULL|         NULL|        NULL|         NULL|        NULL|                NULL|          NULL|                 NULL|{{NULL, NULL, NUL...|
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+


==============================================================================================================
[ENTRIES (マニフェストエントリ)]
SELECT * FROM postgres.public.iceberg_yellow_tripdata.entries

-- 結果 --
+------+-------------------+---------------+--------------------+------------------+--------------------+
|status|        snapshot_id|sequence_number|file_sequence_number|         data_file|    readable_metrics|
+------+-------------------+---------------+--------------------+------------------+--------------------+
|     1|5403871763539957193|              1|                   1|{0, s3://yassan...|{{NULL, NULL, NUL...|
|     1|5403871763539957193|              1|                   1|{0, s3://yassan...|{{NULL, NULL, NUL...|
|     1|5403871763539957193|              1|                   1|{0, s3://yassan...|{{NULL, NULL, NUL...|
|     1|5403871763539957193|              1|                   1|{0, s3://yassan...|{{NULL, NULL, NUL...|
+------+-------------------+---------------+--------------------+------------------+--------------------+

▶ 🗒️ check_iceberg_tbl_metadata.py の中身

#!/usr/bin/env python3
# /opt/spark/scripts/check_iceberg_tbl_metadata.py
#
# このスクリプトは、spark-submit --conf で設定された
# Iceberg JDBC Catalog / S3 / 拡張設定を前提に動作する。
# ここではカタログ設定を一切行わない(起動時設定をそのまま使用)。
#
# 使い方例:
#   spark-submit ... /opt/spark/scripts/check_iceberg_tbl_metadata.py \
#       postgres.public.iceberg_yellow_tripdata
#
# 必須:
#   テーブル名は catalog.schema.table 形式で渡すこと。
#   例: postgres.public.iceberg_yellow_tripdata

import argparse
from typing import List, Tuple

from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException


def make_queries(base: str) -> List[Tuple[str, str]]:
    """
    Iceberg のメタデータテーブル一覧を生成する。
    Iceberg Spark では以下の形式で参照できる:

      <table>.history
      <table>.metadata_log_entries
      <table>.snapshots
      <table>.entries
      <table>.files
      <table>.manifests
    """
    return [
        ("HISTORY",
         f"SELECT * FROM {base}.history ORDER BY made_current_at DESC"),
        ("SNAPSHOTS",
         f"SELECT * FROM {base}.snapshots ORDER BY committed_at DESC"),
        ("METADATA_LOG_ENTRIES",
         f"SELECT * FROM {base}.metadata_log_entries ORDER BY timestamp DESC"),
        ("MANIFESTS",
         f"SELECT content, path, length, partition_spec_id, added_snapshot_id FROM {base}.manifests ORDER BY added_snapshot_id DESC"),
        ("FILES (データファイル)",
         f"SELECT * FROM {base}.files"),
        ("ENTRIES (マニフェストエントリ)",
         f"SELECT * FROM {base}.entries"),
    ]


def run_queries(
    spark: SparkSession,
    queries: List[Tuple[str, str]],
    limit: int,
) -> int:
    """
    メタデータクエリを順番に実行する。
    失敗したクエリ数を返す。
    """
    failed = 0

    for title, sql in queries:
        print("\n" + "=" * 110)
        print(f"[{title}]")
        print(sql)

        try:
            df = spark.sql(sql)

            # print("\n-- スキーマ --")
            # df.printSchema()

            print("\n-- 結果 --")
            df.show(limit, truncate=True)

        except AnalysisException as e:
            failed += 1
            print(f"\n!! AnalysisException: {e.desc}")
        except Exception as e:
            failed += 1
            print(f"\n!! ERROR: {type(e).__name__}: {e}")

    return failed


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Iceberg テーブルのメタデータ(history/snapshots/files など)を確認するスクリプト"
    )
    parser.add_argument(
        "table",
        help="対象テーブル名(catalog.schema.table 形式)"
    )
    parser.add_argument(
        "--limit",
        type=int,
        default=100,
        help="各メタデータクエリの表示行数(デフォルト: 100)"
    )

    args = parser.parse_args()
    
    spark = (
        SparkSession.builder.appName(f"check_iceberg_tbl_metadata:{args.table}")
        .getOrCreate()
    )

    # 現在のデフォルトカタログ確認(環境トラブル切り分け用)
    try:
        current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
        print(f"現在のデフォルトカタログ: {current_catalog}")
    except Exception:
        # Sparkバージョンによっては未対応のため無視
        pass

    queries = make_queries(f"{args.table}")
    failed = run_queries(spark, queries, args.limit)

    spark.stop()

    if failed:
        raise SystemExit(f"{failed} 件のメタデータクエリが失敗しました。")


if __name__ == "__main__":
    main()

ケース4:PostgreSQLってことはrollbackできるの?

pg_lakeの Iceberg tablesのBackupsの項目 を見ると、BEGINを使っていることが分かります。 pg_lakeはPostgreSQLなのでトランザクションが効くはず。という事は、Rollback出来るはずで、その場合、失敗して戻した際にIcebergテーブルとしてメタデータがどうなってるか見てみたい。

単純な1PCでお試ししてみます。
シナリオとしては、Icebergテーブルに対して、COPY ... FROM で、Icebergテーブルに複数COPYを実行し、途中でコケさせて、全部戻せるのかについて確認。

-- pg_lakeでIcebergテーブルを作る
CREATE TABLE tbl_copy_txn ()
USING iceberg
WITH (load_from = 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-01.parquet')
;

-- トランザクション開始
BEGIN;

-- 件数チェック
SELECT count(*) FROM tbl_copy_txn ;

-- データ投入(成功)
COPY tbl_copy_txn FROM 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-02.parquet';

-- 件数チェック
SELECT count(*) FROM tbl_copy_txn ;

-- データ投入(失敗)
COPY tbl_copy_txn FROM 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-99.parquet';

ROLLBACK;

-- 件数チェック(テーブル作成時の件数になってるはず)
SELECT count(*) FROM tbl_copy_txn ;

実際にやってみると、以下の様に戻せる事が分かる。

postgres=# CREATE TABLE tbl_copy_txn ()
USING iceberg
WITH (load_from = 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-01.parquet')
;
CREATE TABLE

postgres=# \d tbl_copy_txn
                                Foreign table "public.tbl_copy_txn"
        Column         |            Type             | Collation | Nullable | Default | FDW options 
-----------------------+-----------------------------+-----------+----------+---------+-------------
 vendorid              | integer                     |           |          |         | 
 tpep_pickup_datetime  | timestamp without time zone |           |          |         | 
 tpep_dropoff_datetime | timestamp without time zone |           |          |         | 
 passenger_count       | bigint                      |           |          |         | 
 trip_distance         | double precision            |           |          |         | 
 ratecodeid            | bigint                      |           |          |         | 
 store_and_fwd_flag    | text                        |           |          |         | 
 pulocationid          | integer                     |           |          |         | 
 dolocationid          | integer                     |           |          |         | 
 payment_type          | bigint                      |           |          |         | 
 fare_amount           | double precision            |           |          |         | 
 extra                 | double precision            |           |          |         | 
 mta_tax               | double precision            |           |          |         | 
 tip_amount            | double precision            |           |          |         | 
 tolls_amount          | double precision            |           |          |         | 
 improvement_surcharge | double precision            |           |          |         | 
 total_amount          | double precision            |           |          |         | 
 congestion_surcharge  | double precision            |           |          |         | 
 airport_fee           | double precision            |           |          |         | 
 cbd_congestion_fee    | double precision            |           |          |         | 
Server: pg_lake_iceberg
FDW options: (catalog 'postgres', location 's3://yassan-test/pg_lake/postgres/public/tbl_copy_txn/58579')

-- ここからトランザクション開始
postgres=# BEGIN;
BEGIN
postgres=*# SELECT count(*) FROM tbl_copy_txn ;
  count  
---------
 3475226
(1 row)

-- データ投入
postgres=*# COPY tbl_copy_txn FROM 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-02.parquet';
COPY 3577543

postgres=*# SELECT count(*) FROM tbl_copy_txn ;
  count  
---------
 7052769
(1 row)

-- あり得ないパスのデータを指定して失敗させる
postgres=*# COPY tbl_copy_txn FROM 's3://yassan-test/data-yellow_trip/yellow_tripdata_2025-99.parquet';
ERROR:  HTTP Error: Unable to connect to URL "https://s3.isk01.sakurastorage.jp/yassan-test/data-yellow_trip/yellow_tripdata_2025-99.parquet": 404 (Not Found).

LINE 1: ..., cbd_congestion_fee::DOUBLE AS cbd_congestion_fee FROM read_parquet('s3://yassan-test/data-yellow_trip/yellow_tr...
                                                                   ^

-- ロールバック実施
postgres=!# ROLLBACK;
ROLLBACK

-- テーブル作成時のレコード数に戻っている 🙌
postgres=# SELECT count(*) FROM tbl_copy_txn ;
  count  
---------
 3475226
(1 row)

更にSpark使ってメタデータをチェックしてみると、失敗手前に成功していたCOPYも反映されていないこと(=テーブル作成時の状態)が分かります。

▶ 🗒️Sparkで確認した際のログを表示

$ docker compose exec spark spark-submit \
  --conf spark.log.level=WARN \
  --conf spark.jars.packages=org.postgresql:postgresql:42.7.8,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1 \
  --conf spark.jars.ivy=/opt/spark/ivy-cache \
  --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.defaultCatalog=postgres \
  --conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.postgres.type=jdbc \
  --conf spark.sql.catalog.postgres.uri=jdbc:postgresql://host.docker.internal:5432/postgres \
  --conf spark.sql.catalog.postgres.jdbc.user=postgres \
  --conf spark.sql.catalog.postgres.jdbc.password=StrongPassword_here \
  --conf spark.sql.catalog.postgres.jdbc.verifyServerCertificate=false \
  --conf spark.sql.catalog.postgres.jdbc.useSSL=false \
  --conf spark.sql.catalog.postgres.jdbc.schema-version=V0 \
  --conf spark.sql.catalog.postgres.warehouse=s3://${BUCKET_NAME}/pg_lake \
  --conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
  --conf spark.sql.catalog.postgres.s3.endpoint=https://s3.isk01.sakurastorage.jp \
  /opt/spark/scripts/check_iceberg_tbl_metadata.py \
  postgres.public.tbl_copy_txn
WARNING: Using incubator modules: jdk.incubator.vector
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /opt/spark/ivy-cache/cache
The jars for the packages stored in: /opt/spark/ivy-cache/jars
org.postgresql#postgresql added as a dependency
org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-13257d86-6fbd-4da8-9afe-141a2fad36ee;1.0
        confs: [default]
        found org.postgresql#postgresql;42.7.8 in spark-list
        found org.checkerframework#checker-qual;3.49.5 in central
        found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 in central
        found org.apache.iceberg#iceberg-aws-bundle;1.10.1 in central
:: resolution report :: resolve 258ms :: artifacts dl 15ms
        :: modules in use:
        org.apache.iceberg#iceberg-aws-bundle;1.10.1 from central in [default]
        org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.1 from central in [default]
        org.checkerframework#checker-qual;3.49.5 from central in [default]
        org.postgresql#postgresql;42.7.8 from spark-list in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-13257d86-6fbd-4da8-9afe-141a2fad36ee
        confs: [default]
        0 artifacts copied, 4 already retrieved (0kB/10ms)
26/02/22 02:27:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/22 02:27:12 INFO SparkContext: Running Spark version 4.0.1
26/02/22 02:27:12 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64
26/02/22 02:27:12 INFO SparkContext: Java version 17.0.17
Setting Spark log level to "WARN".
26/02/22 02:27:18 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
現在のデフォルトカタログ: postgres

==============================================================================================================
[HISTORY]
SELECT * FROM postgres.public.tbl_copy_txn.history ORDER BY made_current_at DESC

-- 結果 --
+--------------------+-------------------+---------+-------------------+
|     made_current_at|        snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2026-02-22 02:15:...|2538362807379914821|       -1|               true|
+--------------------+-------------------+---------+-------------------+


==============================================================================================================
[SNAPSHOTS]
SELECT * FROM postgres.public.tbl_copy_txn.snapshots ORDER BY committed_at DESC

-- 結果 --
+--------------------+-------------------+---------+---------+------------------+-------+
|        committed_at|        snapshot_id|parent_id|operation|     manifest_list|summary|
+--------------------+-------------------+---------+---------+------------------+-------+
|2026-02-22 02:15:...|2538362807379914821|       -1|   append|s3://yassan-tes...|     {}|
+--------------------+-------------------+---------+---------+------------------+-------+


==============================================================================================================
[METADATA_LOG_ENTRIES]
SELECT * FROM postgres.public.tbl_copy_txn.metadata_log_entries ORDER BY timestamp DESC

-- 結果 --
+--------------------+------------------+-------------------+----------------+----------------------+
|           timestamp|              file| latest_snapshot_id|latest_schema_id|latest_sequence_number|
+--------------------+------------------+-------------------+----------------+----------------------+
|2026-02-22 02:15:...|s3://yassan-tes...|2538362807379914821|               0|                     0|
+--------------------+------------------+-------------------+----------------+----------------------+


==============================================================================================================
[MANIFESTS]
SELECT content, path, length, partition_spec_id, added_snapshot_id FROM postgres.public.tbl_copy_txn.manifests ORDER BY added_snapshot_id DESC

-- 結果 --
+-------+------------------+------+-----------------+-------------------+
|content|              path|length|partition_spec_id|  added_snapshot_id|
+-------+------------------+------+-----------------+-------------------+
|      0|s3://yassan-tes...|  8277|                0|2538362807379914821|
+-------+------------------+------+-----------------+-------------------+


==============================================================================================================
[FILES (データファイル)]
SELECT * FROM postgres.public.tbl_copy_txn.files

-- 結果 --
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+
|content|         file_path|file_format|spec_id|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts|        lower_bounds|        upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|first_row_id|referenced_data_file|content_offset|content_size_in_bytes|    readable_metrics|
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+
|      0|s3://yassan-tes...|    PARQUET|      0|     3475226|          71615025|        NULL|        NULL|             NULL|            NULL|{1 -> [01 00 00 0...|{1 -> [07 00 00 0...|        NULL|         NULL|        NULL|         NULL|        NULL|                NULL|          NULL|                 NULL|{{NULL, NULL, NUL...|
+-------+------------------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+------------+--------------------+--------------+---------------------+--------------------+


==============================================================================================================
[ENTRIES (マニフェストエントリ)]
SELECT * FROM postgres.public.tbl_copy_txn.entries

-- 結果 --
+------+-------------------+---------------+--------------------+------------------+--------------------+
|status|        snapshot_id|sequence_number|file_sequence_number|         data_file|    readable_metrics|
+------+-------------------+---------------+--------------------+------------------+--------------------+
|     1|2538362807379914821|              0|                   0|{0, s3://yassan...|{{NULL, NULL, NUL...|
+------+-------------------+---------------+--------------------+------------------+--------------------+

バケットの中を見ても成功したデータが残ってることも無さそう。

$ aws --profile sakura s3 ls s3://yassan-test/pg_lake/postgres/public/tbl_copy_txn/58579 --recursive
2026-02-22 11:15:55   71615025 pg_lake/postgres/public/tbl_copy_txn/58579/data/39b771a9-c674-41db-be41-082824669071/data_0.parquet
2026-02-22 11:15:55       2466 pg_lake/postgres/public/tbl_copy_txn/58579/metadata/00000-ef326f50-924b-4228-91ff-8e1d1d673817.metadata.json
2026-02-22 11:15:55       8277 pg_lake/postgres/public/tbl_copy_txn/58579/metadata/c6744213-3a56-4377-99ed-6db4295403df-m0.avro
2026-02-22 11:15:55       3796 pg_lake/postgres/public/tbl_copy_txn/58579/metadata/snap-2538362807379914821-1-c6744213-3a56-4377-99ed-6db4295403df.avro

おまけ:もしかして、2PC(分散トランザクション)っていける?

結論: ダメでした。

PREPARE TRANSACTION使って、2PC(分散トランザクション)いけるのか?って期待したい所ですが、以下のテストにあるように、少なくとも「Icebergメタデータ変更を含む書き込み」では pg_lake側が禁止していました。

https://github.com/Snowflake-Labs/pg_lake/blob/837e44bff67008af7d1b5f452764ee07eb09ff58/pg_lake_table/tests/pytests/test_iceberg_tmp_objects.py#L8-L30

ここでは触れませんが、PostgreSQLの設定に max_prepared_transactions = 10 を追加して PREPARE TRANSACTION 使えるようにを試したら、確かにテストの通り、cannot prepare a transaction that has Iceberg metadata changes となりました(て言うか、やってから気がついた)。

使いどころ(ユースケース)を考えてみる

個人的にはpg_lakeの「使い所」は大きくこのあたりだと考えます。

「PostgreSQL中心でやりたい」組織のレイクハウス基盤

状況

  • 既にアプリもBIも「全部PostgreSQL前提」で回している
  • だけど履歴データやログが肥大化して、
    • 本番DBに置き続けると遅い・コスト高い
    • とはいえ別のDWH・Sparkクラスターを立てるのは大ごと

pg_lake の使い所

  • Iceberg テーブルを PostgreSQLの CREATE TABLE ... USING iceberg だけで作成・更新 できる
  • PostgreSQL トランザクション境界に統合される(少なくとも COPY などでロールバックが効くことを確認した)
    • 一方で現状では 2PC は禁止される(Iceberg メタデータ変更を含むと prepare できない)
  • データファイル自体は S3/Blob/GCS 上の Parquet に置くから、ストレージコストやスケールはData Lakehouseの様に考えられる

😆 ポイント

  • OLTPは引き続きPostgreSQL、履歴・集計はIcebergテーブルへ
  • でも アプリやBIは全部PostgreSQLに接続、というシンプルな構成を作れる
  • テーブルがデカくなった場合にフルスキャンしたら詰むので慎重さは必要

Iceberg 中心のデータ基盤に「PostgreSQLの窓口」を付けたい

状況

  • 既に S3 + Iceberg + Spark/Trino といったData Lakehouse基盤がある
  • そこに「PostgreSQLからも同じIcebergを触りたい」ニーズがある
    • PostgreSQL 互換で扱いたい
    • JDBC/ODBC で簡単に接続したい

pg_lake の使い所

  • PostgreSQL互換インターフェースとしての「書き込み窓口」とする
  • 利用者はPostgreSQLとしてpg_lake経由にIcebergテーブルを意識せずに書き込みし、既存のData Lakehouseのクエリエンジンがpg_lakeをIcebergテーブルとして参照して取り込む

😆 ポイント

  • Icebergカタログの軽量なPostgreSQLフロント
  • アプリケーション または 運用チーム向けの窓口
  • 細粒度の頻繁な INSERT は small files 問題を引き起こす可能性があるので注意は必要
  • pg_lakeをApache Polaris(Iceberg REST Catalog)として連携して使うとより使い勝手が上がりそう12

S3上のファイルを“そのまま”分析したい

状況

  • S3 に CSV/JSON/Parquet が山ほどある
  • ざっと中身を見たい・軽く集計したいが、毎回ETLやSparkジョブを書くのは面倒

pg_lake の使い所

  • CREATE FOREIGN TABLE ... SERVER pg_lake OPTIONS (path 's3://...') で可能にする
  • 外部テーブルのスキーマのカラムの型は、参照するファイルから自動で推論される
  • 実際のスキャン・集計は DuckDB のカラムナエンジンが担当するので、 大きめのファイルでもPostgreSQL単体よりだいぶ速い

😆 ポイント

  • 「ちゃちゃっとつまみ食いしたい」を Postgres + SQL だけで済ませられる
  • Spark/Trinoを使う前の “最初の見る・試す” レイヤーとして使うイメージ

"軽量な"データパイプライン・ETLの代替

状況

  • バッチで全部DigdagやAirflowでワークフローを作るまでもないが、 ちょっとしたデータ移動・変換処理は定期的に走らせたい
  • でも「S3 ↔ Iceberg ↔ PostgreSQL」を行き来する処理は増えていく

pg_lake の使い所

  • COPY FROM 's3://...' / COPY TO 's3://...'S3⇔PostgreSQL/Icebergのロード・アンロードをSQLだけで記述
  • PL/pgSQL や DO ブロックと組み合わせて、 「月次ファイルをループして取り込む」「集計結果をParquetで吐き出す」といった処理を 全部PostgreSQL側のトランザクション下で完結できる

😆 ポイント

  • 大掛かりなETL基盤までは要らないけど、手作りスクリプトも増やしたくない
  • そこまで大きくない組織・チームの SQL中心のミドルクラスETL基盤として便利

地理空間データのレイクハウス化

ハンズオンでは取り上げませんでしたが、pg_lake_spatial 拡張を使うと PostGIS と組み合わせて 地理空間データも扱えます13

状況

  • PostGISで地理空間分析しているが、 元データ(Shapefile/GeoJSON/Parquetなど)はオブジェクトストレージに大量にある
  • 全部PostgreSQLに入れちゃうとサーバスペック的に厳しい

pg_lake の使い所

  • pg_lake_spatial が GDAL(Geospatial Data Abstraction Library) を経由して各種GEOフォーマットを読める
  • 外部ファイルとして読みつつ、PostGIS と組み合わせて空間クエリをかけられる
  • 必要な粒度でだけ Iceberg テーブルに入れとく、といった分割も可能

😆 ポイント

地理空間 ✕ データレイク として、生ファイル+PostGIS+Iceberg を1つのPostgreSQLエンドポイントで扱える

「開発向け運用分析」基盤

状況

  • マイクロサービスやアプリが多数あり、それぞれPostgreSQLを使っている
  • アプリ側で 軽い分析 または レポート(例:機械学習用特徴量抽出)なども求められる

pg_lake の使い所

  • PostgreSQLの通常のheapテーブル・Icebergテーブル・S3上のファイルを1つのSQLでJOIN可能
  • DuckDB側に重たい集計を寄せて、オンライン系PostgreSQLの負荷を抑えながら それでもアプリは「PostgreSQLに繋ぐだけ」で済む

😆 ポイント

  • ユースケース1~3の組み合わせ
  • 「アプリケーションDBのすぐ隣に小さなレイクハウスを生やす」ことで、 運用と分析のギャップを最小限にする

pg_lake を“使わない方がいい”ケース

ユースケースではないのですが、"NOT" ユースケースとして、
以下のようなケースでは、pg_lake を使わずに Spark/Flink/Trino を主役にした方が無難です。

  • 既に完全に成熟した大規模レイクハウス(Spark/Flink/Trino中心)があり、 PostgreSQLは本当にOLTP専用に閉じておきたい場合
  • 数百TB〜PBクラスを、多数のクエリエンジンが同時に叩くような超大規模分析クラスタ
  • きついSLA付きの超重バッチを全部PostgreSQLベースで回そうとしている場合

1ノードではどうしてもスループットを出すには限界があるので、このあたりは、素直に Spark/Flink/Trino を主役にして、PostgreSQLはメタデータや小さなマート用に留める方が安全です。

落とし穴と運用Tips

これまでの話を踏まえて、落とし穴と運用Tipsを整理します

公式Docker Composeはdownしたら揮発する

  • 落とし穴
    • データ永続化前提ではない構成14
    • PostgreSQLカタログが消えるとIceberg整合が崩れる
  • Tips
    • 本番はソースビルド or 永続ボリューム明示
    • PostgreSQLのバックアップ必須(物理/論理)

Iceberg JDBCカタログ互換は限定的

  • 落とし穴
    • SparkはV0指定で読めるが、V1自動マイグレーションで失敗
    • TrinoではV0指定してもV1を要求されエラー
  • Tips
    • pg_lakeをメインのIcebergカタログにしない
    • 既存Data LakehouseがあるならPolaris/HMS中心に

2PC不可(1PCのみ)

  • 落とし穴
    • PREPARE TRANSACTIONはIceberg変更を含むと禁止扱いとなる
    • 分散トランザクション基盤にはならない
  • Tips
    • 期待できるのは1PCの原子性のみ

Small FilesとVACUUM競合

  • 落とし穴
    • 細かいInsertを高頻度に実行すると自動VACUUM(10分間隔)と更新が衝突する可能性がある
  • Tips
    • そもそもそういった用途には向かない
    • VACUUMはテーブルの利用に合わせて手動と自動に分けて運用するのが良さそう

pgduck_serverはメモリ無限ではない

  • 落とし穴
    • 大規模スキャンでOOMの可能性
    • S3ファイルはローカルキャッシュされる
  • Tips
    • --memory_limit--cache_dirを明示して利用する
    • 実行計画とpushdownを確認する

まとめ

今回、Deep Diveしてみて個人的に pg_lake は、「Iceberg カタログそのもの」になるツールというよりも、

既存の PostgreSQL 世界とデータレイクをつなぐ“入口”として機能するツール

として捉えると設計判断がしやすいと考えます。 つまり、

PostgreSQL派にとっては、、、
「Snowflake風 or BigQuery風のことを、既存スキル+SQLだけでやるための延長線」

Iceberg派にとっては、、、
「既存のIcebergのData Lakehouseに、PostgreSQLという強力な“入口”を増やす手段」

この2つの視点でプロジェクトやチームのスケール、運用体制、既存スタックを見ながら 「どこまでをpg_lakeを活用するか」を決めていくのが良さそうです。

最後に

去年の年末は個人的に色々あって間に合わなかった分、気になってる所を結構潰しました。
心残りはコンテナで揮発しない構成を検討したが、やるには大規模改造になるのでここでは止めました。

pg_lakeは以下のドキュメントにあるように、Icebergテーブルを使ってて「これ欲しいんだよなぁ」ってなるIcebergテーブルへのImport/Exportがとても使いやすいです。

使い所にも挙げましたがハマるところもあるんじゃないでしょうか。これからもウォッチしていきます。 また、この記事書いてる途中にApache Polarisへの対応も出てきたのですが、これはまた別の機会に。

以上、この記事は さくらインターネット Advent Calendar 2025Distributed Computing Advent Calendar 2025 の16日目の記事でした。

これで2025年が越せました。


  1. 今回は取り上げないですが、pg_lakeでのIcebergテーブルとしての書き込みはまた制限が大きいです。以下で書き込みがサポート始まってきました。
    github.com
  2. Snowflake-Labs/pg_lakeのREADME より引用
  3. ソースだとこの辺を参考にしてます
  4. pg_map は 「key/value ペアを composite 型で表し、それを配列(domain)にしたもの」を map として扱う、という方針です。詳しくは pg_lake/docs/pg_map.md を参照。
  5. 何やってるのかは pg_lake/docker/Taskfile.yml を見ると分かります。
  6. 本来は専用のユーザを用意するべきだし、pg_hba.confももっと厳密に設定すべきです。今回はサンプル用なのでザル設定にしています。例えば、実運用では 0.0.0.0/0 のような全許可は避けるべきで。また、Docker ブリッジの CIDR に限定する、127.0.0.1 のみ公開する、専用ユーザを作成する等の制限を必ず行ってください。
  7. さくらのオブジェクトストレージを利用するためには、pgduck_serverがアクセスできるように、S3シークレットを作成する必要があります。S3シークレットの作成方法については、以下のドキュメントを参照してください。
    パーミッション設定によるアクセスキーの発行 - 基本操作 | さくらのクラウド マニュアル
  8. 例えば、PROVIDER credential_chainCHAIN 'env' を使い、KEY_IDSECRET を使わないで、ENV経由で渡すなど。
  9. 本来なら必要な依存JARをSparkイメージに追加する方が良いのですが、今回は簡単のために spark.jars.packages で指定しています。
    その場合、JARのキャッシュディレクトリをsparkコンテナの実行ユーザから書き込みできないといけないので、/opt/spark/ivy-cacheをボリュームマウントして、sparkコンテナをrootユーザで実行しています。
  10. Sparkを使っているドキュメントを見ると--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1 となっているのでかなり古いSpark&Icebergで確認しているようです。
  11. 少なくとも pg_lake [1/22時点の4bc1a58] / Spark v4.0.1 & Iceberg v1.10.1 / Trino v479 の組み合わせでは。
  12. Snowflake-Labs/pg_lake#83Snowflake-Labs/pg_lake#68 を見ると分かるようにREST APIカタログへの対応が進んでいて、Apache Polarisと組み合わせることで夢が広がる。
  13. 詳しくは pg_lake/docs/spatial.md を参照。これでもう1本書きたいところ。
  14. リポジトリにあるコンテナ環境 https://github.com/Snowflake-Labs/pg_lake/tree/main/docker は、docker compose downすると揮発するので本番には向かない。
    本番運用で使うなら、コンテナ使わずにソースビルドして使う方が無難



以上の内容はhttps://yassan.hatenablog.jp/entry/advent-calendar-2025-1216より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14