前回はSQL Serverから読み込んだデータをCSVに出力しましたが、データ量が多い場合毎回全データ取得するのはDBにも負荷がかかるのであまりオススメされません。
そこでSQLServerからデータを読み出す際に追加分だけを取得するようにしてみます。
データの追加分だけSQLServerから取得するためには、embulk-input-sqlserverプラグインのincrementalパラメータとincremental_columnsパラメータを使用します。
このパラメータで、取得するテーブル内の主キーやまたは日付カラムを指定し、データを読み込んだ際にその最新の値(or日付)を保持しておき、次回の読み込み時にはその値を元に追加されたデータのみを指定して取得します。
公式ドキュメントの説明は以下からご確認ください。
以下のブログを参考にさせていただきました。
tech.griphone.co.jp
実施作業
テストデータ作成
以下のブログを参考にさせていただき、テスト用のデータを作ります。
mura-hiro.com
できたデータがこちらです。とりあえず100件ほど作成しました。

ほとんどコピペになりますが、追加したSQLは以下です。
USE TESTDB ;
GO
DECLARE @p_InsertNumber Bigint -- INSERTする行数
SELECT @p_InsertNumber=100; -- 100万に設定
WITH Base AS
(
SELECT
1 AS n
UNION ALL
SELECT
n + 1
FROM
Base
WHERE
n < @p_InsertNumber
),
Nums AS
(
SELECT
Row_Number() OVER(ORDER BY n) AS n
FROM
Base
)
INSERT INTO Employee
SELECT
n
, 'test'
, GETDATE()
FROM
Base
WHERE
n <= @p_InsertNumber
OPTION (MaxRecursion 0); -- 再帰クエリの再帰回数の上限をなくす
設定内容確認
次に前回CSV出力した際の設定ファイルの内容を確認します。
in:
type: sqlserver
driver_path: C:\drivers\sqljdbc_7.2\jpn\mssql-jdbc-7.2.2.jre8.jar
host: EC2AMAZ-L2BKFDH
user: dbuser
password: "******"
database: AdventureWorks2012
schema: Production
table: Product
select: "ProductID, Name, ProductNumber"
where: "ProductID < 500"
order_by: "ProductID ASC"
out:
type: file
path_prefix: C:\Users\Administrator\try1\csv\production_product
file_ext: csv
formatter:
type: csv
charset: UTF-8今回は前回の設定からselectパラメータ他を削除し、incrementalパラメータを2つ追加する変更を行います。
selectパラメータを削除する理由ですが、incrementalパラメータを設定することで処理実行時に内部で自動でSelect文が発行されるため、selectパラメータ他が記載されているとエラーになります。
※内部の処理としては、最初にincremental_columnsで設定したパラメータでソートが行われた後、Select文が実行されます。
in:
type: sqlserver
driver_path: C:\drivers\sqljdbc_7.2\jpn\mssql-jdbc-7.2.2.jre8.jar
host: EC2AMAZ-L2BKFDH
user: dbuser
password: "******"
database: TESTDB
schema: dbo
table: Employee
incremental: true
incremental_columns:
- No
out:
type: file
path_prefix: C:\Users\Administrator\try1\csv\testdb_employee_no
file_ext: csv
formatter:
type: csv
charset: UTF-8
guessコマンド実行
guessコマンドを実行して、設定ファイルを出力します。
embulk guess .\try1\sqlserver_incremental_to_csv.yml -o config_inc_to_csv.yml
コマンドが正常終了していることを確認します。

出力された設定内容は以下のようになっています。

runコマンド実行(初回実行)
以下のコマンドを実行します。
前回と違うのは、-cオプションで差分の情報を保持しているファイルを指定しており、処理実行後は同ファイルを最新の値で更新します。
embulk run .\config_inc_to_csv.yml -c embulk_sql_inc_csv.diff.yaml
エラーなく正常終了しました。

ファイルも想定通りに出力されています。

出力メッセージ確認(初回実行)
出力されたメッセージを見ると、以下のSQLが実行されていることがわかります。
今回は初回実行で差分情報ファイルが存在していかなかったため、全件取得するような処理になっていました。
2020-03-13 14:38:07.365 +0900 [INFO] (0014:task-0000): SQL: SELECT * FROM "dbo"."Employee" ORDER BY "No"
最後の以下のメッセージで、incremental_columnsカラムに設定したカラムの最新の値がlast_recordとして設定されていることがわかります。
2020-03-13 14:38:07.607 +0900 [INFO] (main): Next config diff: {"in":{"last_record":[100]},"out":{}}差分情報ファイル内容は以下のようになっています。

ファイルの中身も取得したデータが出力されています。

テストデータ追加
ここからが本題になります。
追加でテストデータを100件登録します。

追加するSQLは以下です。またもやほぼコピペです。
USE TESTDB ;
GO
DECLARE @p_InsertNumber Bigint -- INSERTする行数
SELECT @p_InsertNumber=200; -- 100万に設定
WITH Base AS
(
SELECT
101 AS n
UNION ALL
SELECT
n + 1
FROM
Base
WHERE
n < @p_InsertNumber
),
Nums AS
(
SELECT
Row_Number() OVER(ORDER BY n) AS n
FROM
Base
)
INSERT INTO Employee
SELECT
n
, 'test'
, GETDATE()
FROM
Base
WHERE
n <= @p_InsertNumber
OPTION (MaxRecursion 0); -- 再帰クエリの再帰回数の上限をなくす
runコマンド実行(2回目実行)
テストデータの追加ができたら再度runコマンドを実行して、エラーが出ていないことを確認します。

ファイルが出力されていることを確認します。

ファイルの中身を確認すると、増分データのみが出力されていることが確認できました。

出力メッセージ確認(2回目実行)
rumコマンド実行時に出力された内容を確認すると、以下のSQLが実行されていることがわかります。
設定ファイルで指定したカラムをorder byでソートした後、Where句で追加されたデータのみが取得されるように条件指定されています。
2020-03-13 15:03:36.444 +0900 [INFO] (0014:task-0000): SQL: SELECT * FROM "dbo"."Employee" WHERE (("No" > ?)) ORDER BY "
No"メッセージの最後にカラムの最新の値を更新して処理が終了されていることも確認できました。
2020-03-13 15:03:36.755 +0900 [INFO] (main): Next config diff: {"in":{"last_record":[200]},"out":{}}
感想及び所感
ウォッチするカラム名は主キーである必要はないようですが、データのロストを考えると主キーで設定したほうが間違いないですね。
日付カラムでも指定できそうですので、次回は日付カラムを指定して、同時間帯のデータがあった場合にどのような処理となるかをみていきます。