Spring Cloud StreamをRabbitMQと組み合わせて、RabbitMQ管理画面から文字列をpublishしてspringアプリケーションでconsumeするだけのサンプルを作成する。
ソースコード・手順
build.gradle
https://start.spring.io/ でCloud Stream, Spring for RabbitMQと後はお好みの依存性を追加してbuild.gradleを生成する。
plugins {
id 'java'
id 'org.springframework.boot' version '3.1.4'
id 'io.spring.dependency-management' version '1.1.3'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
java {
sourceCompatibility = '17'
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2022.0.4")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
dockerでRabbitMQを起動
手軽にpublishするために管理画面を使用したいのでrabbitmq:3ではなくrabbitmq:3-managementを使用する。username/passwordはデフォルト(guest / guest)のままで良い。
sudo docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
consumer
import java.util.function.Consumer; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.Bean; @SpringBootApplication public class SpringCloudStreamSample { public static void main(String[] args) { new SpringApplicationBuilder(SpringCloudStreamSample.class) .web(WebApplicationType.NONE) .run(args); } @Bean public Consumer<String> consume() { return payload -> System.out.println(payload); } }
上記でconsumerを起動する。プロパティファイルが不要な点は後述。
管理画面からpublish
http://localhost:15672/ でRabbitMQ管理画面を開いてデフォルトのusername/passwordでログインする。上記のconsumeというメソッド名によりconsume-in-0というexchangeが自動生成されてるのでそこから適当な文字列をpublishすると、springのconsumerがそれを出力する。
デフォルトで色々自動にやってくれるのでプロパティファイルは無くても起動可能
以下は起動ログの一部だが、consume-in-0.anonymous.jtC4TT2IS_mRDG4XtPEquwというqueueを作ったとか consume-in-0にバインドしたとかlocal_rabbitとかいうバインダーを作っただとか、あれこれ自動生成しているのが分かる。
No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
LiveReload server is running on port 35729
Channel 'application.consume-in-0' has 1 subscriber(s).
Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
Channel 'application.errorChannel' has 1 subscriber(s).
started bean '_org.springframework.integration.errorLogger'
Creating binder: local_rabbit
Constructing binder child context for local_rabbit
Caching the binder: local_rabbit
declaring queue for inbound: consume-in-0.anonymous.jtC4TT2IS_mRDG4XtPEquw, bound to: consume-in-0
Attempting to connect to: [localhost:5672]
Created new connection: rabbitConnectionFactory#24ae5bd5:0/SimpleConnection@4370a547 [delegate=amqp://guest@127.0.0.1:5672/, localPort=51812]
Channel 'rabbit-62463784.consume-in-0.errors' has 1 subscriber(s).
Channel 'rabbit-62463784.consume-in-0.errors' has 2 subscriber(s).
started bean 'inbound.consume-in-0.anonymous.jtC4TT2IS_mRDG4XtPEquw'
Started SpringCloudStreamSample in 2.018 seconds (process running for 2.388)
また、RabbitProperties.javaのusername/passwordあたりのデフォルト値はRabbitMQのdocker runのサンプルと同一になっている。なので一切プロパティが無くても最低限のサンプルコード程度であれば起動する。
package org.springframework.boot.autoconfigure.amqp; @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { private static final int DEFAULT_PORT = 5672; private String host = "localhost"; private Integer port; private String username = "guest"; private String password = "guest";