Kafka Connect
前回は Kafka Connect を利用してローカルファイルシステムのファイルを Kafka に連携しましたが、今回は RDB(PostgreSQL) と Kafka、Kafka と S3 で連携したいと思います。
構成

Source 側
Database の作成
yum -y install postgresql-server postgresql-setup initdb echo "listen_addresses = '*'" >> /var/lib/pgsql/data/postgresql.conf cp /var/lib/pgsql/data/pg_hba.conf /var/lib/pgsql/data/pg_hba.conf.org cat << EOF > /var/lib/pgsql/data/pg_hba.conf # PostgreSQL Client Authentication Configuration File # =================================================== local all all trust host all all 127.0.0.1/32 trust host all all 192.168.0.0/16 md5 EOF systemctl restart postgresql
psql -U postgres -W -c "CREATE DATABASE testdb";
CREATE TABLE test_table (
seq bigint PRIMARY KEY,
item varchar(256)
);
CREATE USER connectuser with password 'connectuser';
GRANT ALL ON test_table TO connectuser;
INSERT INTO test_table(seq, item) VALUES (1, 'hoge');
INSERT INTO test_table(seq, item) VALUES (2, 'apple');
load する Kafka Connect の設定
cp -pv /etc/kafka/connect-distributed.properties /etc/kafka/connect-distributed-2.properties
vim /etc/kafka/connect-distributed-2.properties ###bootstrap.servers=localhost:9092 bootstrap.servers=kafka-2:9092 ###group.id=connect-cluster group.id=connect-cluster-datahub-2
cat << EOF > test_db.json
{
"name": "load-db-sample-data",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:postgresql://kafka-1:5432/testdb",
"connection.user" : "connectuser",
"connection.password" : "connectuser",
"mode" : "incrementing",
"incrementing.column.name" : "seq",
"table.whitelist" : "test_table",
"topic.prefix" : "db_",
"tasks.max" : "1"
}
}
EOF
curl -X POST -H "Content-Type: application/json" http://kafka-2:8083/connectors -d @test_db.json
curl http://kafka-2:8083/connectors
Kafka Connect を起動
connect-distributed /etc/kafka/connect-distributed-2.properties
別のサーバから Kafka の topic を確認すると、データが格納されていることがわかります。
[root@kafka-3 ~]# kafka-console-consumer --bootstrap-server kafka-2:9092 --topic db_test_table --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":1,"item":"hoge"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":2,"item":"apple"}}
この状態で kafka-1 の PostgreSQL にデータを Insert します。
psql -h kafka-1 -p 5432 -d testdb -U connectuser -c "INSERT INTO test_table(seq, item) VALUES (3, 'banana');" Password for user connectuser: INSERT 0 1
Kafka にデータが append されていることがわかりました。
[root@kafka-3 ~]# kafka-console-consumer --bootstrap-server kafka-2:9092 --topic db_test_table --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":1,"item":"hoge"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":2,"item":"apple"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":3,"item":"banana"}}
Sink 側
Source 側の取り込みができましたので、今度は Sink 側の取り込みをしてみます。
S3の作成
S3 は AWS のコンソールから作成しました。
AWS のクレデンシャルを以下のディレクトリに作成しておきます。
mkdir ~/.aws cat << EOF > ~/.aws/credentials [default] aws_access_key_id = ******************** aws_secret_access_key = ******************** EOF chmod 600 ~/.aws/credentials
Sink する Kafka Connect の設定
cat << EOF > test_db_sink.json
{
"name": "sink-db-sample-data",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name" : "test-kafka-connect1234",
"s3.region" : "ap-northeast-1",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size" : 1,
"topics" : "db_test_table",
"tasks.max" : "1"
}
}
EOF
curl -X POST -H "Content-Type: application/json" http://kafka-2:8083/connectors -d @test_db_sink.json
curl http://kafka-2:8083/connectors
PGPASSWORD=connectuser psql -h kafka-1 -p 5432 -d testdb -U connectuser -c "INSERT INTO test_table(seq, item) VALUES (4, 'grape');" INSERT 0 1
S3 に Json で書き込みされていることがわかりました。
C:\Users\User>aws s3 ls s3://test-kafka-connect1234/topics/db_test_table/partition=0/ C:\Program Files\Amazon\AWSCLI\.\dateutil\parser\_parser.py:1177: UnicodeWarning: Unicode equal comparison failed to convert both arguments to Unicode - interpreting them as being unequal 2019-03-20 23:54:19 24 db_test_table+0+0000000000.json 2019-03-20 23:54:21 25 db_test_table+0+0000000001.json 2019-03-20 23:54:22 26 db_test_table+0+0000000002.json 2019-03-20 23:54:25 25 db_test_table+0+0000000003.json
curl -X DELETE http://kafka-2:8083/connectors/load-db-sample-data curl -X DELETE http://kafka-2:8083/connectors/sink-db-sample-data curl http://kafka-2:8083/connectors