以下の内容はhttps://techblog.raksul.com/entry/2025/02/14/134718より取得しました。


Kafka と Karafka を無事故・無停止でアップグレードした話

はじめに

こんにちは。SRE チームの吉原です。

ラクスルでは様々なサービスが Rails で動いており、そのメッセージングキューにに AWS MSK (Kafka) を利用しています。このときの Kafka は 2.7.0 で 2024/9/11 に強制的にバージョンアップを控えていました。Ruby の Kafka Client である Karafka は 1.4 を使っており、しばらくアップデートされていませんでした。2023 年 8 月には Karafka にまつわる大きな障害が発生しており、入社時はこの問題について調査をしていました。この結果、ログや可観測性の問題がわかっていました。Karafka 1 系は ruby-kafka をベースとしており、このライブラリも積極的に開発されていないため、Kafka が 3.5.0 になったときの互換性や安定性については不安がありました。Karafka 2 系は librdkafka をベースをしているので安定性や性能面でも期待が持てます。ここでは様々なサービス間で複雑に入り組む全ての Producer / Consumer アップグレードに加え、無事故・無停止で Kafka のメジャーバージョンをアップグレードしたプロジェクトについて紹介します。

進め方

まずは大きな進め方を決めます。一度に全部はできないので、一つ一つのコンポーネントをインクリメンタルに、なるべくロールバックできるように進めていきます。

  1. 利用状況の調査
  2. テスト環境の構築
  3. Kafka / Karafka を手元で動かす
  4. Karafka 2.4 までの変更内容の調査
  5. Karafka アップグレード (1.4 → 2.4)
  6. Kafka アップグレード (2.7.0 → 3.5.1)

Kafka を初めて触るメンバーも多かったことから、手元で Kafka を触る環境を作って慣れてもらうようにしたり、私自身も Ruby で Kafka を使うのは初めてだったので、Karafka の動きを把握する必要がありました。

Kafka の利用状況

社内のサービス全てのコードを調査したところ、11 サービスにわたって利用されていることがわかりました。それぞれのサービスで実装している Consumer と Producer をリストアップして対応関係をリストアップしました。

kafka usage relation

メッセージは Protocol Buffers でシリアライズされていました。Producer / Consumer 間通信のデータ互換性については、Karafka がメッセージをそのまま送る限り維持されそうです。

テスト環境の構築

最終的に AWS のコンソールからアップグレードをしないといけないため、Kafka はローカルではなく AWS 上に構築しました。現状の Kafka バージョンである 2.7.0 とアップグレード後の 3.5.1 の2つを準備しました。Kafka の認証には SASL SCRAM を使っているため、このあたりの設定も本番環境と同様にしています。

Kafka / Karafka を手元で動かす

端末から Kafka を使う

ここでは詳細は省きますが、手元の端末から直接通信できるように VPN などをセットアップしておいてください。Kafka の SASL 設定なども同様に省略します。

Mac から使うには Homebrew で Kafka を入れます。

$ brew install kafka

インストールすると kafka-console-consumer , kafka-console-producer , kafka-topics などのコマンドが利用可能になります。

次に環境変数を設定します。KAFKA_HEAP_OPTS はヒープメモリ不足で Java が起動しないことがあるので、あらかじめ多めに設定しておきます。

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export BOOTSTRAP_SERVER_270="kafka://b-1.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096,kafka://b-2.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096,kafka://b-3.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096"
export BOOTSTRAP_SERVER_351="kafka://b-1.peukafka351.kn5d8s.c4.kafka.ap-northeast-1.amazonaws.com:9096,kafka://b-2.peukafka351.kn5d8s.c4.kafka.ap-northeast-1.amazonaws.com:9096,kafka://b-3.peukafka351.kn5d8s.c4.kafka.ap-northeast-1.amazonaws.com:9096"

client.properties を設定します。SASL の設定とユーザ名・パスワード等の設定を書きます。bootstrap server は client.properties にも書けますが、ここではよく切り替えるため設定からはあえて抜いています。ただ長くて視認性が悪いので環境変数に入れておきます。

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="peu" \
   password="peu-password";

これで設定は完了です。

手元で動作確認をしてみます。

トピックの一覧

$ kafka-topics --bootstrap-server ${BOOTSTRAP_SERVER_270} --command-config client.properties --list
__amazon_msk_canary
__consumer_offsets

トピックを作成する

$ kafka-topics --bootstrap-server ${BOOTSTRAP_SERVER_270} --command-config client.properties --create --topic yoshihara-test

Created topic yoshihara-test.

Consumer を起動する

$ kafka-console-consumer --bootstrap-server ${BOOTSTRAP_SERVER_270} --consumer.config client.properties --topic yoshihara-test

起動すると producer から送られた文字列が表示されるはずです。

Producer を起動する

$ kafka-console-producer --bootstrap-server ${BOOTSTRAP_SERVER_270} --producer.config client.properties --topic yoshihara-test
>

起動すると > が表示され入力待ちになります。

jq を使って読みやすくする

例えば Producer にこのような JSON 文字列が送られたとします。

>{"name":"json"}
>{"hoge":1234,"fuga":[1,2,3]}

これを jq を使ってパイプで渡してあげると、以下のように送られてきた JSON 文字列が整形されて色付けされるので読みやすくなります。

$ kafka-console-consumer --bootstrap-server ${BOOTSTRAP_SERVER_270} --consumer.config client.properties --topic yoshihara-test | jq
{
  "name": "json"
}
{
  "hoge": 1234,
  "fuga": [
    1,
    2,
    3
  ]
}

Rails Console で動かす (Karafka 1.4)

ローカルマシンのコンソールで最低限の Produce / Consume の動作が見られたので、今度は実際のアプリケーションから Rails Console や Karafka サーバを動かしてみます。

Produce

bin/rails c で Rails Console を起動します。

そこで WaterDrop から指定したトピックにメッセージを送ることができます。

pry(main)> WaterDrop::SyncProducer.call(SecureRandom.uuid.to_s, topic: "yoshihara-test")

成功するとこんなログが出ます。

Current leader for yoshihara-test/0 is node b-1.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096 (node_id=1)
Sending 1 messages to b-1.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096 (node_id=1)
[produce] Sending produce API request 9 to b-1.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096
[produce] Waiting for response 9 from b-1.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096
[produce] Received response 9 from b-1.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096
Successfully appended 1 messages to yoshihara-test/0 on b-1.peukafka270.ddtif9.c4.kafka.ap-northeast-1.amazonaws.com:9096 (node_id=1)

Console Consumer で送られた文字列が受け取れることを確認します。

$ kafka-console-consumer --bootstrap-server ${BOOTSTRAP_SERVER_270} --consumer.config client.properties --topic yoshihara-test
e1f8c6a8-b34d-46dd-a299-c6df1d76e8ae
4fd96a75-94f2-4b73-adfb-98557c294e4b
06472c27-b900-4ba8-9b32-799757b1b1ed
68bfe0d1-8ca7-4042-9db6-bdc09628d622

Consume

連携テストのため、DummyConsumer を作成し、受信したメッセージをログに出すだけの Consumer をつくります。karafka.rb を修正して Consumer を読み込みます。

class DummyConsumer < ApplicationConsumer
  def consume
    super do
      Rails.logger.info(params.raw_payload)
    end
  end
end

これで bundle exec karafka server から Consumer を立ち上げます。

次に DummyConsumer が Consume しているトピックに対して、 Console から Produce します。

すると以下のように karafka server のログが出力されるはずです。

25fa4dd8-7249-474b-9567-28eb7547cda1
Inline processing of topic yoshihara-test with 1 messages took 39 ms
1 messages on yoshihara-test topic delegated to DummyConsumer

Karafka 2.4 までの変更点

これで既存の Kafka / Karafka の動きを把握することができました。次に Karafka の最新バージョンである 2.4 までの変更点を調べます。

Karafka 公式サイトに親切にもほとんどの変更点がまとめられています。

ここでは我々のユースケースで対応する必要があった主な変更点と、公式で言及されていない大切な変更点を紹介します。この他にもフォアグラウンドで動かすようにしたり、chrono logger を削除して Ruby Logger にするなど様々な変更を加えていますがここでは詳細は省きます。

waterdrop.rb の削除

karafka.rb でコンフィグレーションされるので Karafka 2.x 系では不要になりました。もはや二重に設定を書く必要がありません。

Responder 廃止

2.x 系では Responder がなくなり、Karafka.producer に統一されました。

Puma / Sidekiq などのシャットダウンフック

ワーカー終了時に Karafka.producer.close を呼ぶ必要があります。

Puma では以下のようにします。

 on_worker_shutdown do
   ::Karafka.producer.close
 end

Sidekiq では以下ようにします。

  config.on(:shutdown) do
    ::Karafka.producer.close
  end

Produce Syntax の変更

1.4 系の既存実装は以下のようになっていると思います。

WaterDrop::SyncProducer.call(
  my_payload,
  topic: Settings.kafka.topics.foo,
)

2 系では以下のようにする必要があります。

Karafka.producer.produce_sync(
  payload: my_payload,
  topic: Settings.kafka.topics.foo
)

Responder を WaterDrop Producer に変更する

Responder は使えなくなりました。WaterDrop Producer に書き換える必要があります。

Kafka 設定の変更

1.4 では以下のように設定していたのものは

config.kafka.seed_brokers = Settings.kafka.urls
config.kafka.sasl_scram_username = Settings.kafka.sasl_scram_username
config.kafka.sasl_scram_password = Settings.kafka.sasl_scram_password
config.kafka.sasl_scram_mechanism = Settings.kafka.sasl_scram_mechanism
config.kafka.ssl_ca_certs_from_system = Settings.kafka.ssl_ca_certs_from_system

2.4 では以下のようにする必要があります。

config.kafka.merge!({
    'security.protocol': Settings.kafka.security_protocol,
    'bootstrap.servers': Settings.kafka.bootstrap_servers,
    'sasl.username': Settings.kafka.sasl_scram_username,
    'sasl.password': Settings.kafka.sasl_scram_password,
    'sasl.mechanisms': Settings.kafka.sasl_scram_mechanism,
})

それぞれ見ていきます。

url が廃止され bootstrap_servers に変更

URL スキームがついたリソースアドレス形式ではなく、client.properties に書くような形式となり、ホスト名とポート番号をカンマ区切りで表現します。

ssl_ca_certs_from_system が不要に

設定からは削除します。

sasl_scram_mechanism が変更

sasl_scram_mechanismsha512 を利用している場合は sasl.mechanismsSCRAM-SHA-512 と入力する必要があります。

security.protocol を追加

security.protocolSASL_SSL とする必要があります。

Karafka::BaseConsumer#consume がメッセージのリストが渡されるようにになった

横展開の項目で詳説します。

テストの変更

主な変更点は以下のとおりです。

  • RSpec.configurerequire 'karafka/testing/rspec/helpers’ をして config.include Karafka::Testing::RSpec::Helpers をする。
  • karafka_consumer_for から karafka.consumer_for に書き換える。
  • allow(WaterDrop::AsyncProducer).to receive(:call) から allow(Karafka.producer).to receive(:produce_sync) に書き換える。
  • .with() の payload も Produce と同様のシンタックスに変更となっています。

Consumer Group 名の変更

この情報が公式には明記されていないので注意が必要です。Consumer Group が違うとオフセット情報が引き継がれません。Client の設定にもよりますが、デフォルトでは Kafka が保持する最も古いオフセットのメッセージから読み込むことになります。

Karafka 2 系へアップグレードするとき、 Consumer Group を同一に保つには #{client_id}_#{topic_name} へ設定する必要があります。以下にコードサンプルを示します。

  routes.draw do
    consumer_group "#{config.client_id}_#{Settings.kafka.topics.hoge}" do
      topic Settings.kafka.topics.hoge do
        consumer HogeConsumer
        manual_offset_management true
      end
    end
  end

Karafka アップグレード

Karafka 2.4 までの差分がわかったところで、これらを既存サービスに適用し、 karafka server が動作するところまでを確認しました。しかし実際の Rails アプリケーション上でメッセージをやり取りしていないので、なるべく実際の動作に近い環境で動かしてみます。

Consumer はログを出力するだけのダミーを作り、Produce は Rails Console から手で Produce して動作を確認します。これを様々なバージョンでマトリックスを書いて動作を検証します。その表が以下になります。

Karafka Version Kafka Version 確認方法 確認済
karafka 1.4 Kafka 2.7.0 WaterDrop produce + console consume
karafka 1.4 Kafka 3.5.1 WaterDrop produce + console consume
karafka 1.4 Kafka 2.7.0 karafka server + console produce (+WaterDrop produce)
karafka 1.4 Kafka 3.5.1 karafka server + console produce (+WaterDrop produce)
karafka 2.3 Kafka 2.7.0 WaterDrop produce + console consume
karafka 2.3 Kafka 3.5.1 WaterDrop produce + console consume
karafka 2.3 Kafka 2.7.0 karafka server + console produce (+WaterDrop produce)
karafka 2.3 Kafka 3.5.1 karafka server + console produce (+WaterDrop produce)
karafka 2.4 Kafka 2.7.0 WaterDrop produce + console consume
karafka 2.4 Kafka 3.5.1 WaterDrop produce + console consume
karafka 2.4 Kafka 2.7.0 karafka server + console produce (+WaterDrop produce)
karafka 2.4 Kafka 3.5.1 karafka server + console produce (+WaterDrop produce)

この動作を検証をしているときはまだ Karafka 2.3 だったので、途中から 2.4 を加えています。

パイロットプロジェクトでのアップグレード

クライアントやサーバのバージョンが違っても機能互換性が保たれているのを確認できました。いよいよ実際のサービスに適用していきます。まず依存関係の少ないサービスを一つ選び、これをパイロットプロジェクトとして選択し、先んじてアップグレードを試みます。

アップグレードしたところ、古いメッセージから再度 Consume してしまいました。理由は Consumer Group 名が変わってしまい、オフセット情報が引き継がれていなかったためです。幸いメッセージが重複してもサービスに影響はありませんでした。Consumer Group 名の引き継ぎ方法は前述の通りです。

このサービスでのアップグレードにより、段階を踏まずとも 1 PR ですぐ適用できることもわかりました。他サービスも同様に一気にアップグレードすることにしました。ただしどのライブラリやフレームワークでも同じですが、アップグレードは major version と minor version ごとに行うべきとされています。もちろん Karafka においてもそれが推奨されています。

横展開までの準備

冒頭で説明した通り、Karafka の動作に関しては可観測性に問題がありました。どのようなオフセット情報を持ったメッセージをいつ受信し、どんなビジネスロジックが実行されて成功もしくは失敗したのかなど、アプリケーションの動作状況がよくわかっていませんでした。また #consume のシンタックスも変更されて複数メッセージを受け取るように変更されたので、既存実装をなるべく変更せず移行できるようにする必要もありました。

そこで SRE チームでは全ての Karafka 実装に共通した基底クラスを作成しました。既存の Consumer 実装クラスは、このクラスを継承して #consume_one を実装します。すると Karafka 1 系と同じように 1 メッセージごとにアクセスできるので、コードの変更は Produce のシンタックス変更だけで済ますことができます。以下に参考実装を示します。

class ApplicationConsumer < Karafka::BaseConsumer
  attr_reader :message

  def initialize
    Karafka.logger.info("#{self.class} Initialized.")
  end

  def consume
    Karafka.logger.info("#{self.class} consumed #{messages.size} message(s) from topic: #{messages.metadata.topic}. " \
                        "offset: #{messages.metadata.first_offset} / #{messages.metadata.last_offset} ")
    messages.each do |message|
      @message = message
      Karafka.logger.info(
        "#{self.class} is processing a message. timestamp: #{message.metadata.timestamp}, " \
        "received_at: #{message.metadata.received_at}, partition: #{message.metadata.partition}, " \
        "offset: #{message.metadata.offset}, " \
        "raw_payload: '#{message.raw_payload.force_encoding('UTF-8')}'"
      )
      consume_one
      Karafka.logger.info(
        "#{self.class} processed successfully. partition: #{message.metadata.partition}, " \
        "offset: #{message.metadata.offset}"
      )
      mark_as_consumed(message)
    end
  rescue StandardError => e
    Karafka.logger.error(e)
  end

  def shutdown
    Karafka.logger.info("#{self.class} is shutting down...")
  end
end

処理の前後でメッセージのメタデータをログに出力し、動作をわかりやすくしています。他のサービスでも同じフォーマットで出力されるので、SRE 観点でもログパーサーを再利用できるようになるメリットもあります。

Consumer 設定で manual_offset_management true にしているので、それぞれの message に対して mark_as_consumed(message) を呼びます。この処理もサービス側は認識することがないので、これまでと同じ実装を維持できます。

なお Karafka.logger のログレベルは :debug にしておくことで produce 時のメッセージもログに出力することができます。

全サービスへの展開

この基底クラスと必要な変更を全サービスに適用し、SRE チームで Pull Request を作成しました。デプロイ時はサービス開発者と共にログを tail しながら Kafka を使う動作を行ってもらってちゃんと動作するのか検証しました。パイロットプロジェクトで問題点が洗い出されていたので、大きな問題が出ることなく適用できました。

Kafka アップグレード

いよいよ Kafka をアップグレードをするフェーズに来ました。アップグレード後の動作は機能テストで確認することはできますが、AWS がアップグレードしているときの動作がどうなるかは公開されていないため、アップグレードにはどのくらい時間がかかるのか、ダウンタイムがあるのかなどが不明でした。アップグレード後に Karafka は再接続をして新しいバージョンと接続できるのかも不安がありました。

そこで Rails Console で 1 秒ごとに Produce しながら、DummyConsumer で 1 秒ごとに Consume してアップグレード中の挙動を見ることにしました。

アップグレードの動作を観察する

まずアップグレード用の Kafka のテストクラスタを構築します。

次に以下のように Rails Console から毎秒 Produce させておきます。

> loop { Karafka.producer.produce_sync(payload: Time.now.to_s + ' こんにちは', topic: "yoshihara-test"); sleep(1); }

karafka server を立ち上げて DummyConsumer でこのメッセージが受け取れるか観察します。

次に AWS Console の UI 上から、アップグレードボタンを押下します。すると以下のような画面でアップグレードする対象バージョンを指定するプルダウンが現れます。

Kafka upgrade dropdown

アップグレードを開始するとこのような画面になります。

アップグレードプロセス中にクラスターからの読み取りとクラスターへの書き込みは続行できます

と書いてあります。さて本当でしょうか。

Kafka upgrade started

37 分後、初期化が完了しました。

Kafka upgrade initialized

1 時間 10 分後、ファイナライズフェーズに入りました。

Kafka upgrade finalize

切り替わりが発生したタイミング (83% になったタイミング) で既存の接続は一旦切断されました。この後も数回切断がありましたが、クライアントはブローカーへ再接続を行い、すぐに Consume が開始されました。

その間でメッセージのロストがないことを確認しました。オフセットは保持されているため、再接続後は続きのメッセージから consume を始めていました。

1 時間 20 分後、アップグレードが完了しました。

Kafka Upgrade completed

クラスターの概要を見てバージョンを確認してみると 3.5.1 になっていました。

Kafka 3.5.1 confirm

これによりブラックボックスだった Kafka アップグレード作業がなんとなくわかりました。作業時間の目安がついたので、サービス開発者にどの程度の作業時間を要するのか伝えることができるのも大きい収穫です。

社内テスト環境への適用

サービス開発者に社内テスト環境の Kafka のアップグレードをアナウンスし、日時を調整をしました。

当日は Slack のハドルに集まってそれぞれのスクリーンをシェアし、各サービスのログを見ながら異常がないかすぐに状況がわかるようにスタンバイをしました。

結果としては何も問題なくスムーズにアップグレードを完了させることができました。その後は念の為少し期間テスト環境で動かし続けて様子を見ました。

すると Karafka がアップグレードされていないサービスが見つかりました。アップグレード作業自体は定型化されているのですぐに変更を適用できました。

本番環境への適用

テスト環境でのアップグレード後も問題がなかったので、本番にも適用することにしました。

テスト環境のときと同様にサービス開発者と共にアプリケーションログを見ながら異常がないか確認しながら実施しました。

本番でもアップグレードは問題なく完了しました。このブログを書くまでだいぶ時間は立ちましたが、今日まで極めて安定してサービスを運用できています。

まとめ

Kafka のアップグレード自体は終わってみるとあっけないものです。しかしここに至るまでに多くの手順を踏んで確認したことにより、無事故・無停止で Kafka と Karafka のアップグレードを行うことができました。

We are hiring!

ラクスルでは成長するビジネスを支える SRE を募集しています。 https://hrmos.co/pages/raksul/jobs/rksl866q_2




以上の内容はhttps://techblog.raksul.com/entry/2025/02/14/134718より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14