みなさんこんにちは、レッドハットでソリューションアーキテクトをしている暮林といいます。
前回はKaravan Designerがどんなものかということを紹介いたしました。
今回の記事では、前回の予告通りノーコードでapache KafkaのPoCができるような使い方の紹介をしたいとおもいます。 Karavan DesignerにAtlas Mapを組み合わせていきます。
AtlasMapとは
AtlasMapはデータマッピングソリューションです。
データマッピングというのは、あるサービスと別のサービス(またはデータベースなど)を接続するときに、どの項目をどの項目に移送するのか、または編集を施すのか、といった作業のことです。
このときに複雑な変換を行う必要がある場合と、よくある編集操作(たとえばTrimや文字数でカットなど)で済む場合があります。AtrasMapは後者で済んでしまうような場合に効果を発揮します。もちろんこの手の編集は通常のプログラミングでも行うことはできますが、Boring Jobになること間違いなしです。
使ってみる
AtlasMapは実行可能Jarとして配布されていますが、VSCodeプラグインとしても起動できます。とはいえVSCode内で実行可能Jarを起動しているだけっぽいので、Javaが実行できる環境は必要です。
プラグインインストール
VSCodeのマーケットプレースから「AtlasMap」を検索して、「AtlasMap Data Transformation editor by Red Hat」をインストールしてください。

空ファイル作成
AtlasMapプラグインは「*.adm」という拡張子に反応します。ここではためしに、「order-mapping.adm」というファイルをVSCodeのファイルメニューから作成して、ダブルクリックしてみましょう。

AtlasMapのエディターが開きましたね!
早速マッピングしてみる
早速マッピングをしてみましょう。JsonやXMLのスキーマ、もしくはインスタンスをインポートしましょう。

少し長いですが、今回はこのJsonをつかってみましょう。適当な名前で保存してから、上記のメニューからインポートしてください。インスタンスかスキーマかをきれるので、インスタンスを選んでください。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "OrderId"
},
{
"type": "string",
"optional": false,
"field": "OrderType"
},
{
"type": "string",
"optional": false,
"field": "OrderItemName"
},
{
"type": "int32",
"optional": false,
"field": "Quantity"
},
{
"type": "string",
"optional": false,
"field": "Price"
},
{
"type": "string",
"optional": false,
"field": "ShipmentAddress"
},
{
"type": "string",
"optional": false,
"field": "ZipCode"
},
{
"type": "string",
"optional": false,
"field": "OrderUser"
}
],
"optional": true,
"name": "user1.earth.dbo.Orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "OrderId"
},
{
"type": "string",
"optional": false,
"field": "OrderType"
},
{
"type": "string",
"optional": false,
"field": "OrderItemName"
},
{
"type": "int32",
"optional": false,
"field": "Quantity"
},
{
"type": "string",
"optional": false,
"field": "Price"
},
{
"type": "string",
"optional": false,
"field": "ShipmentAddress"
},
{
"type": "string",
"optional": false,
"field": "ZipCode"
},
{
"type": "string",
"optional": false,
"field": "OrderUser"
}
],
"optional": true,
"name": "user1.earth.dbo.Orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "change_lsn"
},
{
"type": "string",
"optional": true,
"field": "commit_lsn"
},
{
"type": "int64",
"optional": true,
"field": "event_serial_no"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "user1.earth.dbo.Orders.Envelope"
},
"payload": {
"before": null,
"after": {
"OrderId": 1,
"OrderType": "E",
"OrderItemName": "Lime",
"Quantity": 100,
"Price": "3.69",
"ShipmentAddress": "541-428 Nulla Avenue",
"ZipCode": "4286",
"OrderUser": "user1"
},
"source": {
"version": "1.1.2.Final-redhat-00001",
"connector": "sqlserver",
"name": "user1.earth",
"ts_ms": 1636711635163,
"snapshot": "true",
"db": "InternationalDB",
"schema": "dbo",
"table": "Orders",
"change_lsn": null,
"commit_lsn": "00000049:00007a08:0003",
"event_serial_no": null
},
"op": "r",
"ts_ms": 1636711635166,
"transaction": null
}
}
このJsonはDebeziumというChange Data Captureソリューションが出力するJsonの例です。アプリとしては前半部分のスキーマが不要で、payloadの中のafterの中身だけが欲しいと想定します。今度はTarget側でインポートしましょう。(これも一旦適当なファイルに保存してください)
{
"OrderId": 1,
"OrderType": "E",
"OrderItemName": "Lime",
"Quantity": 100,
"Price": "3.69",
"ShipmentAddress": "541-428 Nulla Avenue",
"ZipCode": "4286",
"OrderUser": "user1"
}
こんな感じになったはずです。(Payload→Afterと開いてあります)

つなげてみる
勘のいい方はここからの操作は想像がついているかもしれません。左側の項目を右側の項目のマッピングしたい項目にドラッグアンドドロップしてみてください。

線が引けましたね。あとは他の項目もどんどん線をひいていきましょう。

あとはファイルをセーブしてマッピングの作成は完了です!admファイルはそのまま利用します。
Karavan Designerから利用する
では当初の目的に戻ってKaravan DesignerからできあがったAtlasMapを利用するDSLを書いてみましょう。 まず最初のステップはKafka TopicNot Secured Sourceからです。検索ボックスに「Kafka」といれてSourceタブを選んでください。


ひとまずパラメータの設定はおいておきましょう。次がAtlasMapのステップです。検索ボックスにatlasといれてProducerタブを選んでください。


最後はまたKafkaですね。検索ボックスに「Kafka」といれてSinkタブを選んでください。


こんな感じになったと思います。この例では色気を出して途中でログを出力しています。(Log Sinkをつかっています) 現状のYAML DSLはこんなかんじです。
apiVersion: camel.apache.org/v1
kind: Integration
metadata:
name: kafkatokafka.yaml
spec:
flows:
- from:
uri: 'kamelet:kafka-not-secured-source'
steps:
- to:
uri: atlasmap
- kamelet:
name: log-sink
- kamelet:
name: kafka-not-secured-sink
あとはそれぞれのステップをクリックしてパラメータをうめましょう。
Kafka not Secured SourceについてはBrokerとTopic Namesだけいれれば十分ですが、Consumer GroupまでいれておくとKafka側でどこまで読んだか覚えてくれるようになります。(Consumer GroupはConsumerをスケールアウトしたときにきちんとそれぞれのプロセス割り振るためのパラメータですが、Consumerが1多重でもどこまで読んだかをKafka側に記憶してもらうために使うことができます)
次はAtlasMapのパラメーターです。ここはちょっと覚えゲー的なところがありますが、Resource Uriにこのようにいれてください。
file:/etc/camel/resources/order-mapping.adm

パラメータを入れ終わったらファイルメニューからセーブしておきましょう。今回の例では最終的にこうなりました。
apiVersion: camel.apache.org/v1
kind: Integration
metadata:
name: kafkatokafka.yaml
spec:
flows:
- from:
uri: 'kamelet:kafka-not-secured-source'
steps:
- to:
uri: 'atlasmap:file:/etc/camel/resources/order-mapping.adm'
- kamelet:
name: log-sink
- kamelet:
name: kafka-not-secured-sink
parameters:
brokers: my-cluster.svc
topic: outcoming-topic
parameters:
brokers: 'my-cluster.svc:9092'
topic: incoming-topic
consumerGroup: group1
これでKafka Topicを読み出して、項目を編集、ログを出して、別のKafka Topicに出力するDSLがかけました! おいおい、これじゃAtlasMapのadmファイルローカルにあるままだぞ?感じですが、Camel-Kを実行するコマンドで --resource オプションとして指定してあげるとadmファイルがクラスパスの通っている場所にアップロードされる仕組みになっています。(下記のコマンドはadmファイルをmapディレクトリに置いている前提のコマンドです)
kamel run kafkatokafka.yaml --resource file:map/order-mapping.adm
まとめ
今回はAtlasMapでデータマッピングを作成して、Karavan Designerのステップに組み込む方法をおつたえいたしました。 例として前後をKafkaとましたが、Camel-KのSourceとSinkには様々なサービスが使えることがチラチラと見えたと思いますので、同じようなやりかたで別々のサービスをつなげることができます。
そうはいってもREST APIで普通受けるよね?それはどうやるの?と思った方は下記のブログを読んでみてください。OpenAPI Specを使ってhttpの受け口を自動生成する方法です。
Karavan Designerについて2回の連載を行いましたがいかがでしたでしょううか?エンジニアを駆り出さなくてもサービスとサービスの接続ができそうな気がして来た!という方がいらっしゃれば幸いです。
