以前からPG-Stromのパッケージにpg2arrowというユーティリティを同梱しており、これを使うと、PostgreSQLに投げたクエリからApache Arrow形式のファイルを作成する事ができる。
kaigai.hatenablog.com
qiita.com
昨年、当初のバージョンを作った時から、内部的には色々ゴチャゴチャ変わっていて*1、Arrow_Fdwとコードを共有するための改良や、RDBMSへの接続に固有の部分だけを別ファイルに切り出すという事をやっていた。
これは、PostgreSQLだけをデータソースにするのではなく、Webアプリやゲームの業界でよく使われる MySQL や、将来的にはNoSQLなどへも簡易に対応できるようにという意味での基礎工事のようなものである。今回はまず、これを MySQL に対応させてみた。
MySQLからWebアプリやゲームのログ情報を Apache Arrow 形式で抜き出し、これを単純なファイルとして NVME-SSD 上のボリュームに保存する。
これらのファイルを Arrow_Fdw 外部テーブルを用いて PostgreSQL にマッピングすれば、解析系DBにわざわざデータを再度インポートしなくても、Webアプリやゲームのログを集計処理や異常検知に回す事ができるようになる。
加えて、PG-StromであればArrow_Fdw外部テーブルに対してSSD-to-GPU Direct SQLを実行する事ができるので、きちんとシステムを設計してやれば、秒速で10億レコード超を処理する事だって不可能ではない。
使い方自体はそれほど複雑なものではない。
大半のオプションが pg2arrow と共通*2で、今回の機能強化に合わせて-tオプションを追加した程度である。
$ mysql2arrow --help
Usage:
mysql2arrow [OPTION] [database] [username]
General options:
-d, --dbname=DBNAME Database name to connect to
-c, --command=COMMAND SQL command to run
-t, --table=TABLENAME Table name to be dumped
(-c and -t are exclusive, either of them must be given)
-o, --output=FILENAME result file in Apache Arrow format
--append=FILENAME result Apache Arrow file to be appended
(--output and --append are exclusive. If neither of them
are given, it creates a temporary file.)
Arrow format options:
-s, --segment-size=SIZE size of record batch for each
Connection options:
-h, --host=HOSTNAME database server host
-p, --port=PORT database server port
-u, --user=USERNAME database user name
-P, --password=PASS Password to use when connecting to server
Other options:
--dump=FILENAME dump information of arrow file
--progress shows progress of the job
--set=NAME:VALUE config option to set before SQL execution
--help shows this message
Report bugs to <pgstrom@heterodb.com>.簡単な例でデータを抽出してみる。
なお-tオプションは、SELECT * FROM tablenameの省略形。
$ mysql2arrow -d mysql -u root -t t1 -o /dev/shm/hoge.arrow
生成された Apache Arrow ファイルのスキーマ定義、データの配置はこんな感じ
$ mysql2arrow --dump /dev/shm/hoge.arrow
[Footer]
{Footer: version=V4, schema={Schema: endianness=little, fields=[{Field: name="id", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="a", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="b", nullable=true, type={Float64}, children=[], custom_metadata=[]}, {Field: name="c", nullable=true, type={Utf8}, children=[], custom_metadata=[]}, {Field: name="d", nullable=true, type={Timestamp: unit=sec}, children=[], custom_metadata=[]}], custom_metadata=[{KeyValue: key="sql_command" value="SELECT * FROM t1"}]}, dictionaries=[], recordBatches=[{Block: offset=472, metaDataLength=360 bodyLength=60480}]}
[Record Batch 0]
{Block: offset=472, metaDataLength=360 bodyLength=60480}
{Message: version=V4, body={RecordBatch: length=1000, nodes=[{FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=26}, {FieldNode: length=1000, null_count=18}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=17}], buffers=[{Buffer: offset=0, length=0}, {Buffer: offset=0, length=4032}, {Buffer: offset=4032, length=128}, {Buffer: offset=4160, length=4032}, {Buffer: offset=8192, length=128}, {Buffer: offset=8320, length=8000}, {Buffer: offset=16320, length=0}, {Buffer: offset=16320, length=4032}, {Buffer: offset=20352, length=32000}, {Buffer: offset=52352, length=128}, {Buffer: offset=52480, length=8000}]}, bodyLength=60480}Python (PyArrow) で読み込んでみるとこんな感じですね。
$ python
Python 3.6.8 (default, Oct 7 2019, 17:58:22)
[GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> f = pa.ipc.open_file('/dev/shm/hoge.arrow')
>>> f.get_record_batch(0).to_pandas()
id a b c d
0 1 750.0 884.851090 c4ca4238a0b923820dcc509a6f75849b 2020-11-25 17:14:56
1 2 962.0 373.533847 c81e728d9d4c2f636f067f89cc14862c 2019-03-26 01:19:29
2 3 287.0 384.895995 eccbc87e4b5ce2fe28308fd9f2a7baf3 2018-11-04 21:55:32
3 4 573.0 890.063600 a87ff679a2f3e71d9181a67b7542122c 2023-05-14 15:24:37
4 5 948.0 778.885925 e4da3b7fbbce2345d7772b0674a318d5 2023-01-18 16:41:12
.. ... ... ... ... ...
995 996 -295.0 424.007169 0b8aff0438617c055eb55f0ba5d226fa 2017-08-15 01:40:31
996 997 -849.0 648.545034 ec5aa0b7846082a2415f0902f0da88f2 2023-05-23 08:58:55
997 998 530.0 865.244230 9ab0d88431732957a618d4a469a0d4c3 2024-07-20 14:13:06
998 999 244.0 96.534528 b706835de79a2b4e80506f582af3676a 2018-08-10 01:42:04
999 1000 997.0 157.958900 a9b7ba70783b617e9998dc4dd82eb3c5 2016-12-27 08:06:44
[1000 rows x 5 columns]ビルドは PG-Strom のモジュールと一緒にやればよいのですが、mysql-develパッケージをインストールしていない人もいるという想定で((なおpostgresql-develパッケージは全人類がインストールするという想定で))、makeの実行時にWITH_MYSQL2ARROW=1を付加します。
$ make WITH_MYSQL2ARROW=1 gcc -D__MYSQL2ARROW__=1 -D_GNU_SOURCE -g -Wall -I ../src -I ../utils -I /usr/local/pgsql-11/include/server -I/usr/include/mysql -m64 -L/usr/lib64/mysql -Wl,-rpath,-L/usr/lib64/mysql ../utils/sql2arrow.c ../utils/mysql_client.c ../src/arrow_nodes.c ../src/arrow_write.c -o ../utils/mysql2arrow -lmysqlclient
追記モードで異常終了した時のファイルの回復
もう一点。MySQL対応にするついでに、以前からあった設計上の問題の修正を行っている。
pg2arrowやmysql2arrowで--appendを指定し、追記モードでSQLの処理結果をApache Arrowファイルに追加する場合、以前のエントリで紹介したように、ファイル末尾のフッター領域を上書きして新しいデータを追加し、最後にフッター領域を再構築する。
この時、SQLの異常終了やコマンド自体のバグによってプロセスが異常終了してしまったら、元々のApache Arrowファイルが破損したまま残ってしまう事になっていた。
これを修正するため、最新版では元々のApache Arrowファイルのフッタ領域の内容(このサイズ自体は大した量ではないので)を別の領域に退避し、シグナルハンドラとon_exit()ハンドラを用いて、終了コード 0 以外でプロセスが exit したり、SIGSEGVやSIGBUSを受け取った場合にはこれを元の位置に書き戻すという処理を行っている。
例えば、6GB程度の大きさがあるテーブル t0 から100行だけ取り出す。これは生成された Apache Arrow ファイルも5kB程度のもの。
$ pg2arrow -d postgres -c "SELECT * FROM t0 LIMIT 100" -o /dev/shm/monu.arrow $ ls -l /dev/shm/monu.arrow -rw-r--r--. 1 kaigai users 4934 Mar 25 12:41 /dev/shm/monu.arrow
ここに、今度はテーブル全体を追記中にコマンドを ctrl-c で中断してみる事にする。
$ pg2arrow -d postgres -c "SELECT * FROM t0" --append /dev/shm/monu.arrow ^C
別ターミナルでファイルの大きさを観察してみると、確かに途中までデータが書き込まれ、順調にApache Arrowファイルが肥大化している事が分かるが、pg2arrowの異常終了後、最終的には元の大きさに戻っている。
$ ls -lh /dev/shm/monu.arrow -rw-r--r--. 1 kaigai users 769M Mar 25 12:46 /dev/shm/monu.arrow $ ls -lh /dev/shm/monu.arrow -rw-r--r--. 1 kaigai users 1.1G Mar 25 12:47 /dev/shm/monu.arrow $ ls -lh /dev/shm/monu.arrow -rw-r--r--. 1 kaigai users 1.3G Mar 25 12:47 /dev/shm/monu.arrow $ ls -l /dev/shm/monu.arrow -rw-r--r--. 1 kaigai users 4934 Mar 25 12:47 /dev/shm/monu.arrow
PyArrowで当該ファイルをオープンしてみても、元通り100行のデータを含む Apache Arrow ファイルである。
$ python
>>> import pyarrow as pa
>>> f = pa.ipc.open_file('/dev/shm/monu.arrow')
>>> f.get_record_batch(0).num_rows
100
>>> f.num_record_batches
1
>>> f.get_record_batch(0).num_rows
100SIGKILLで強制終了した場合など救えないケースもあるが、一応、こういった運用面での安定性に寄与する機能も強化されているという事で。

