RabbitMQはじめて触ったけどよく分からん、ということでチュートリアルを読むついでにテキトーに訳すことにした。
RabbitMQチュートリアル(https://www.rabbitmq.com/getstarted.html)の4 Routing Java(https://www.rabbitmq.com/tutorials/tutorial-four-java.html)をテキトーに訳した。
Routing
(using the Java client)
先のチュートリアルではシンプルなロギングシステムを作りました。これで複数のレシーバにログメッセージをブロードキャスト可能になりました。
このチュートリアルでは更に機能追加を行い、メッセージのサブセットのみサブスクライブ可能にします。たとえば、クリティカルなエラーメッセージのみログファイルに向ける一方で、コンソールにはログメッセージをすべて出力できるようにします。
Bindings
先の例でバインディングの作成方法については解説しました。バインディングのコードは以下のようになります。
channel.queueBind(queueName, EXCHANGE_NAME, "");
バインディングはエクスチェンジとキューとの関連付けです。つまり、キューはエクスチェンジからのメッセージのみ受け付けます。
バインディングはルーティングキー(routingKey)パラメータも取ることが出来ます。basic_publishのパラメータとの混乱を避けるため、バインディングのパラメータはバインディングキー(binding key)と呼ぶことにします*1。キーと一緒にバインディングを作るには以下のようにします。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
バインディングキーの意味付けはエクスチェンジタイプに依存します。fanoutエクスチェンジでは前に使用したとおり無視されます。
Direct exchange
先のチュートリアルのロギングシステムではすべてのコンシューマーにすべてのメッセージをブロードキャストしていました。今回は重要度(severity)に応じてメッセージのフィルタリングが可能なように拡張します。たとえば、警告や情報のログメッセージでディスク容量を浪費しないよう、クリティカルなエラーを受信するときにだけディスクにログメッセージを書き出すようにします。
fanoutエクスチェンジにはそのような柔軟性は無く、単なるブロードキャストの機能しかありません。
その代わりにdirectエクスチェンジを使います。directエクスチェンジのルーティングアルゴリズムはシンプルで、メッセージのルーティングキーにマッチするバインディングキーを持つキューにメッセージを送信します。
これの解説のために以下のような設定を行います。

この設定では、directエクスチェンジXに二つのキューをバインドします。前者のキューはバインディングキーorangeでバインドし、後者は二つのバインディングを持ち、一つはバインディングキーblackでもう片方はgreenです。
この設定では、エクスチェンジにパブリッシュされるメッセージのうち、ルーティングキーorangeを持つメッセージはキューQ1にルーティングされます。ルーティングキーがblackないしgreenのメッセージはQ2に行きます。それ以外のすべてのメッセージは破棄されます。
Multiple bindings

同一のバインディングキーで複数のキューをバインドするのは全く問題ありません。上記のようにXとQ1はバインディングキーblackでバインディングを作成できます。この場合、directエクスチェンジはfanoutのように振る舞い、キーにマッチするすべてのキューにメッセージをブロードキャストします。ルーティングキーblackを持つメッセージはQ1とQ2に配信されます。
Emitting logs
チュートリアルのロギングシステムでは上記のモデルを使います。fanoutではなくdirectエクスチェンジにメッセージを送信します。ログの重要度はルーティングキーで指定します。これにより、受信側のプログラムは受信したい重要度のログを選択可能になります。まずログ送信側を見て行きます。
これまで通り、まずエクスチェンジを作成します。
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
次に、メッセージ送信の開始は以下のようにします。
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
説明の簡略化のため、'severity'には'info', 'warning', 'error'のいずれか一つを指定するものとします。
Subscribing
メッセージ受信は以前のチュートリアルとほぼ同じですが、一つ違いがあり、受信したい重要度ごとにバインディングを作成します。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
Putting it all together

EmitLogDirect.javaクラスのコードです。
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //.. }
ReceiveLogsDirect.javaのコードです。
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
これまで通りコンパイルを行います(コンパイルとクラスパスについてはtutorial one を参照)。説明簡略化のため、サンプル実行時にはクラスパスに環境変数$CP(windowsでは%CP%)を使います。
ファイルへのログメッセージは'warning'と'error'('info'は除く)のみにしたい場合、コンソールで以下のように実行します。
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
画面上ですべてのログメッセージを参照したい場合、以下のように実行します。
$ java -cp $CP ReceiveLogsDirect info warning error [*] Waiting for logs. To exit press CTRL+C
次に、例えば、errorログメッセージの送信は以下のように行います。
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.'
(すべてのソースコードは(EmitLogDirect.java source)と(ReceiveLogsDirect.java source)です)
パターンに応じてメッセージをリッスンする方法についてはtutorial 5に進んで下さい。