Apache Camel Advent Calendar 9日目の記事は、サポート担当古市が担当します。 テーマは、「Camel Kafka コンポーネント」

Openshiftなどのクラウド環境では camel-kafka コンポーネントをご利用されているお客様がとても多い印象です。 クライアントや他サービスといった前段からのリクエストをルーティングするといった用途ではなく、クラウド上で生成されるログなどをルーティングする事例が増えています。
ご利用するにあたり、今回も製品ドキュメントもしくはコミュニティードキュメントを確認していただく方法を強くお勧めします。 ここでは、コミュニティードキュメントを見ていきます。
オープンソースコミュニティーならではといった感じですが、camel 2.13 から提供開始された camel-kafka コンポーネントも非常に豊富なオプションを提供しています。 "Example"セクションには、多数サンプルが記述されています。きっとイメージしている使用例が見つかるはずですので、一通り見ていただくと良いです。
検証する際には docker で kafka broker インスタンスを用意するのが一般的な方法かと思います。
https://hub.docker.com/r/apache/kafka
せっかくなので、今回は camel-jbang を使い kafka broker インスタンスを用意してみます。 infra コマンドを使いコンテナの起動などの管理を行えます。 list オプションを使い、kafka に対応しているか確認します。
% camel infra list | grep kafka kafka confluent, redpanda, strimzi Apache Kafka, Distributed event streaming platform %
strimzi 実装の kafka broker を起動します
% camel infra run kafka strimzi
Starting service kafka with implementation strimzi
{
"brokers" : "localhost:9092",
"getBootstrapServers" : "localhost:9092"
}
別ターミナルで、稼働中のサービスリストを表示します。
% camel infra ps ALIAS IMPLEMENTATION DESCRIPTION kafka confluent, redpanda, strimzi Apache Kafka, Distributed event streaming platform %
getオプションで、サービス接続用のURLを確認可能です。
% camel infra get kafka
{
"brokers" : "localhost:9092",
"getBootstrapServers" : "localhost:9092"
}
%
kafka broker のログを確認することも可能です。
camel infra log kafka
kafka broker が立ち上がったので、疎通確認をしてみます。 camel-jbang では簡単にメッセージを確認できるよう、camel route を用意せずに consumer endpoint だけを指定し、メッセージを受信することも可能です。
camel cmd send --poll --endpoint='kafka:test?brokers=localhost:9092'
別ターミナルで、kafka broker へメッセージを送信します。メッセージ送信も camel-jbang で実現できます。
% camel cmd send --body='Hello World' --uri='kafka:test?brokers=localhost:9092' 2025-08-26 13:43:47.921 29144 --- kafka://test?brokers=localhost:9092 : Sent (success) (261ms) %
メッセージ送信後、camel cmd send -poll を実行していたターミナルでは、このようなログが出力されるはずです。
2025-08-26 13:43:44.905 29120 --- kafka://test?brokers=localhost:9092 : Poll received (success) (3s282ms) Endpoint kafka://test?brokers=localhost:9092 Exchange (DefaultExchange) InOnly 526363B4E3B0BA0-0000000000000000 Message (DefaultMessage) Header (Long) CamelMessageTimestamp 1756183428168 (2025-08-26 13:43:48 / 0s ago) Header (RecordHeaders) kafka.HEADERS RecordHeaders(headers = [], isReadOnly = false) Header (Long) kafka.OFFSET 12 Header (Integer) kafka.PARTITION 0 Header (Long) kafka.TIMESTAMP 1756183428168 Header (String) kafka.TOPIC test Body (String) (bytes: 11) Hello World %
これで kafka broker の疎通確認ができました。 前回用意した camel-ftp コンポーネントを使った camel route を改修してみます。
% cat Hello.java
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
public class Hello extends RouteBuilder {
@Override
public void configure() throws Exception {
from("file:data?charset=utf-8")
.split(body().tokenize("\n"))
.unmarshal().csv()
.marshal().json(JsonLibrary.Jackson)
.to("kafka:test?brokers=localhost:9092");
from("kafka:test?brokers=localhost:9092")
.log("received: ${body}");
}
}
CSVファイルを読み込んだ後、改行コードを使いメッセージ分割し、それぞれ kafka broker へ jsonフォーマットで送信します。 それとは別に kafka broker からメッセージを読み込み、ログ出力する camel route も用意しました。
実行すると、メッセージを出力するログを確認できます。
% camel run Hello.java ~割愛~ 2025-08-26 11:25:22.764 INFO 7312 --- [aConsumer[test]] Hello.java:15 : received: [["1","Bob","23"]] 2025-08-26 11:25:22.769 INFO 7312 --- [aConsumer[test]] Hello.java:15 : received: [["2","Nancy","30"]] 2025-08-26 11:25:22.773 INFO 7312 --- [aConsumer[test]] Hello.java:15 : received: [["3","David","18"]]
検証が終了したら、infra コマンドでサービスを停止します。
% camel infra stop kafka Shutting down service kafka (PID: 3540)
このサンプルは、業務シナリオとかけ離れているので退屈に感じるかもしれません。ポイントとしては以下になります。
- ファイルデータを取り込む (camel が提供する豊富な component に置き換え可能です)
- 取り込んだデータを分割や加工 (フォーマット変換はもちろん、データ修正、条件式を使った分岐も可能です)
- kafka broker へ送信
- camel-jbang を使い、簡単に検証
camel-file / camel-kafka コンポーネントの記事を通して camel component を試す際、コミュニティードキュメントを確認後、 camel-jbang を使い試す流れを体感できたかと思います。 皆さんが興味のある camel component を見つけ、camel-jbang を使い試していただけば幸いです。
そうそう、各component にはそれぞれ専用オプションが豊富に用意されています。簡易版とはなりますが、camel-jbang でそれらオプション内容を確認することも可能です。
% camel doc kafka | grep autoOffsetRese ║ autoOffsetReset │ What to do when there is no initial offset in ZooKeeper or if an offset is out │ latest │ String ║ %
明日の advent calendar 10日目では、「Camel による Exception/Error Handling」についてご紹介します。 アドベントカレンダーの一覧はこちらです。 qiita.com