以下の内容はhttps://dshimizu.hatenablog.com/entry/2024/12/12/050153より取得しました。


Apache Icebergの概要を浅く掴みながらAmazon S3 Tablesを触ってみたメモ

はじめに

2024 AWS re:Invent で Amazon S3 Tables という新しいストレージサービスが発表されて話題になっていました。 データレイクに関連するサービスのようですが、あまりよくわかってないので、概要を理解するためにざっと調べながら触ってみました。

Amazon S3 Tables

Apache Iceberg 形式のテーブルをサポートするクラウドオブジェクトストレージサービス、のようです。

Apache Iceberg

Apache Iceberg をまだよくわかっていないのですが、2017年ごろからNetflixによって開発されたデータレイクにおける大量の構造化データと非構造化データを扱うためのテーブルの仕様であるようです(特定のソフトウェアとかではない)。 Apache Iceberg の他にも Apache Hudi、Delta Lake といったものもあるようで、Open Table Format と呼ばれるものに則った技術であるようです。

  • Introduction - Apache Iceberg™

    Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table.

従来のデータレイクについてもそれほどわかっていませんが、主に CSV, JSON, AVRO, Parquet、ORC といったフォーマットでデータが格納されているようです。 これらには以下のような課題があるようです。

  • ファイルの増加等によるパフォーマンス劣化
  • 同時アクセス時の整合性を担保できない
  • データが追加・更新されると過去の状態に復元できない
  • スキーマの変更が難しい

Apache Iceberg などのOpen Table Formatはこういった問題を解決するためのものであるようです。

AWS での Apache Iceberg

AWS でも Glue や EMR, Athena, Redshift はApache Icebergをサポートしているようでした。 この時データをS3に配置したデータに対して Athena などから Spark SQL を使用して、Iceberg、Hudi、Delta Lake 形式のテーブルフォーマットで作成・操作することはできたようです。

Amazon S3 Tables はIceberg形式のテーブル用のストレージサービスで、上記のような自己管理のものと比べて最大3倍のクエリ性能と10倍のトランザクション処理が可能となったようです

今のところ、以下のサービスとのテーブルの統合はプレビューのようです。

チュートリアルをやる

チュートリアルをやってみます。

環境

OS のバージョンは以下のとおりです。

% uname -ropsv
Linux 6.1.0-22-cloud-amd64 #1 SMP PREEMPT_DYNAMIC Debian 6.1.94-1 (2024-06-21) unknown GNU/Linux
% lsb_release -a
No LSB modules are available.
Distributor ID: Debian
Description:    Debian GNU/Linux 12 (bookworm)
Release:    12
Codename:   bookworm

aws cli v2 の場合は 2.22.10 以上が必要かと思います。

% aws --version
aws-cli/2.22.23 Python/3.12.6 Linux/6.1.0-22-cloud-amd64 exe/x86_64.debian.12

事前準備

IAM Identity Center を使っているので一時認証情報を取得して環境変数へセットします。

% export AWS_ACCESS_KEY_ID="****....****"
% export AWS_SECRET_ACCESS_KEY="****....****"
% export AWS_SESSION_TOKEN="****....****"

現状、S3 テーブルは東京リージョンでは使えないので us-east-1 を使うように環境変数へセットします。

% export AWS_REGION=us-east-1

Spark のインストール

% sudo apt update && sudo apt install openjdk-17-jdk
% mkdir $HOME/.local
% cd $HOME/.local
% curl -OL https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
% tar zxvf spark-3.5.4-bin-hadoop3.tgz

S3Tableの操作

S3 テーブルのバケットを作成します。 --profile--region は, AWS 関連の環境変数を設定していれば不要です。

% aws  --profile sandbox --region us-east-1 s3tables create-table-bucket --name ds-s3-tables-test

以下のようなレスポンスが返ってきます。

{
    "arn": "arn:aws:s3tables:us-east-1:123456789012:bucket/ds-s3-tables-test"
}

作成されたテーブルバケットを確認します

% aws --profile sandbox --region us-east-1 s3tables list-table-buckets

以下のようなレスポンスが返ってきます。

{
    "tableBuckets": [
        {
            "arn": "arn:aws:s3tables:us-east-1:123456789012:bucket/ds-s3-tables-test",
            "name": "ds-s3-tables-test",
            "ownerAccountId": "123456789012",
            "createdAt": "2024-12-11T03:34:20.863452+00:00"
        }
    ]
}

テーブルバケット内にテーブルを作成するときは、名前空間(namespace)と呼ばれる, 複数のテーブルを管理する論理的な入れ物が必要になるようです。 namespace 等の作成はCLIでも作成できるようですが、ここからはSparkで操作してみます。

Catalog はテーブルの現在のメタデータを追跡するためのもので、Sparkセッションで指定できるようです。

% ./bin/spark-shell \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3,software.amazon.awssdk:s3tables:2.29.26,software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.3 \
--conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \
--conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:us-east-1:123456789012:bucket/ds-s3-tables-test \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

s3tablesbucket という Catalog 内に example_namespace という NAMESPACE を作成します。

scala> spark.sql("CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.example_namespace")

以下のようなレスポンスが返ってきます。

res1: org.apache.spark.sql.DataFrame = []

Table を作成してみます。

scala> spark.sql(
     | """ CREATE TABLE IF NOT EXISTS s3tablesbucket.example_namespace.`example_table` (
     |     id INT,
     |     name STRING,
     |     value INT
     | )
     | USING iceberg """
     | )

以下のようなレスポンスが返ってきます。

res2: org.apache.spark.sql.DataFrame = []

作成したテーブルにデータをInsertしてみます。

scala> spark.sql(
     | """
     |     INSERT INTO s3tablesbucket.example_namespace.example_table
     |     VALUES
     |         (1, 'ABC', 100),
     |         (2, 'XYZ', 200)
     | """)

以下のようなレスポンスが返ってきます。

res3: org.apache.spark.sql.DataFrame = []

データの検索を行ってみます。

scala> spark.sql(""" SELECT * FROM s3tablesbucket.example_namespace.example_table """).show()
+---+----+-----+
| id|name|value|
+---+----+-----+
|  1| ABC|  100|
|  2| XYZ|  200|
+---+----+-----+

終わりに

S3 Table と Apaceh IceBerg についてのドキュメント等を読みながら、Spark で S3 Table を操作してみました。 データレイクについての運用の知見があまりないので、この程度ではどういった点が良いのか等ピンときませんでしたが、概要については大まかに少しだけ掴めたと思います。

参考




以上の内容はhttps://dshimizu.hatenablog.com/entry/2024/12/12/050153より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14