以下の内容はhttps://rheb.hatenablog.com/entry/cac20251209より取得しました。


Camel Kafka コンポーネント

Apache Camel Advent Calendar 9日目の記事は、サポート担当古市が担当します。 テーマは、「Camel Kafka コンポーネント」

Openshiftなどのクラウド環境では camel-kafka コンポーネントをご利用されているお客様がとても多い印象です。 クライアントや他サービスといった前段からのリクエストをルーティングするといった用途ではなく、クラウド上で生成されるログなどをルーティングする事例が増えています。

ご利用するにあたり、今回も製品ドキュメントもしくはコミュニティードキュメントを確認していただく方法を強くお勧めします。 ここでは、コミュニティードキュメントを見ていきます。

camel.apache.org

オープンソースコミュニティーならではといった感じですが、camel 2.13 から提供開始された camel-kafka コンポーネントも非常に豊富なオプションを提供しています。 "Example"セクションには、多数サンプルが記述されています。きっとイメージしている使用例が見つかるはずですので、一通り見ていただくと良いです。

検証する際には docker で kafka broker インスタンスを用意するのが一般的な方法かと思います。

https://hub.docker.com/r/apache/kafka

せっかくなので、今回は camel-jbang を使い kafka broker インスタンスを用意してみます。 infra コマンドを使いコンテナの起動などの管理を行えます。 list オプションを使い、kafka に対応しているか確認します。

% camel infra list | grep kafka
 kafka             confluent, redpanda, strimzi            Apache Kafka, Distributed event streaming platform                             
% 

strimzi 実装の kafka broker を起動します

% camel infra run kafka strimzi
Starting service kafka with implementation strimzi
{
  "brokers" : "localhost:9092",
  "getBootstrapServers" : "localhost:9092"
}

別ターミナルで、稼働中のサービスリストを表示します。

 % camel infra ps
 ALIAS             IMPLEMENTATION                DESCRIPTION                                        
 kafka             confluent, redpanda, strimzi  Apache Kafka, Distributed event streaming platform 
% 

getオプションで、サービス接続用のURLを確認可能です。

% camel infra get kafka
{
  "brokers" : "localhost:9092",
  "getBootstrapServers" : "localhost:9092"
}
% 

kafka broker のログを確認することも可能です。

camel infra log kafka

kafka broker が立ち上がったので、疎通確認をしてみます。 camel-jbang では簡単にメッセージを確認できるよう、camel route を用意せずに consumer endpoint だけを指定し、メッセージを受信することも可能です。

camel cmd send --poll --endpoint='kafka:test?brokers=localhost:9092'

別ターミナルで、kafka broker へメッセージを送信します。メッセージ送信も camel-jbang で実現できます。

% camel cmd send --body='Hello World' --uri='kafka:test?brokers=localhost:9092'
2025-08-26 13:43:47.921  29144 ---      kafka://test?brokers=localhost:9092 : Sent (success) (261ms)
% 

メッセージ送信後、camel cmd send -poll を実行していたターミナルでは、このようなログが出力されるはずです。

2025-08-26 13:43:44.905  29120 ---      kafka://test?brokers=localhost:9092 : Poll received (success) (3s282ms)
 Endpoint  kafka://test?brokers=localhost:9092 
 Exchange  (DefaultExchange)  InOnly  526363B4E3B0BA0-0000000000000000 
 Message   (DefaultMessage) 
 Header    (Long)             CamelMessageTimestamp  1756183428168 (2025-08-26 13:43:48 / 0s ago) 
 Header    (RecordHeaders)            kafka.HEADERS  RecordHeaders(headers = [], isReadOnly = false)        
 Header    (Long)                      kafka.OFFSET  12                                                     
 Header    (Integer)                kafka.PARTITION  0                                                      
 Header    (Long)                   kafka.TIMESTAMP  1756183428168                                          
 Header    (String)                     kafka.TOPIC  test                                                   
 Body      (String) (bytes: 11) 
 Hello World 

% 

これで kafka broker の疎通確認ができました。 前回用意した camel-ftp コンポーネントを使った camel route を改修してみます。

% cat Hello.java 
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;

public class Hello extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("file:data?charset=utf-8")
         .split(body().tokenize("\n"))
            .unmarshal().csv()
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:test?brokers=localhost:9092");

        from("kafka:test?brokers=localhost:9092")
            .log("received: ${body}");
    }
}

CSVファイルを読み込んだ後、改行コードを使いメッセージ分割し、それぞれ kafka broker へ jsonフォーマットで送信します。 それとは別に kafka broker からメッセージを読み込み、ログ出力する camel route も用意しました。

実行すると、メッセージを出力するログを確認できます。

% camel run Hello.java

~割愛~

2025-08-26 11:25:22.764  INFO 7312 --- [aConsumer[test]] Hello.java:15                            : received: [["1","Bob","23"]]
2025-08-26 11:25:22.769  INFO 7312 --- [aConsumer[test]] Hello.java:15                            : received: [["2","Nancy","30"]]
2025-08-26 11:25:22.773  INFO 7312 --- [aConsumer[test]] Hello.java:15                            : received: [["3","David","18"]]

検証が終了したら、infra コマンドでサービスを停止します。

% camel infra stop kafka
Shutting down service kafka (PID: 3540)

このサンプルは、業務シナリオとかけ離れているので退屈に感じるかもしれません。ポイントとしては以下になります。

  • ファイルデータを取り込む (camel が提供する豊富な component に置き換え可能です)
  • 取り込んだデータを分割や加工 (フォーマット変換はもちろん、データ修正、条件式を使った分岐も可能です)
  • kafka broker へ送信
  • camel-jbang を使い、簡単に検証

camel-file / camel-kafka コンポーネントの記事を通して camel component を試す際、コミュニティードキュメントを確認後、 camel-jbang を使い試す流れを体感できたかと思います。 皆さんが興味のある camel component を見つけ、camel-jbang を使い試していただけば幸いです。

そうそう、各component にはそれぞれ専用オプションが豊富に用意されています。簡易版とはなりますが、camel-jbang でそれらオプション内容を確認することも可能です。

% camel doc kafka | grep autoOffsetRese
║ autoOffsetReset               │ What to do when there is no initial offset in ZooKeeper or if an offset is out │ latest                  │ String                  ║
% 

明日の advent calendar 10日目では、「Camel による Exception/Error Handling」についてご紹介します。 アドベントカレンダーの一覧はこちらです。 qiita.com




以上の内容はhttps://rheb.hatenablog.com/entry/cac20251209より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14