リファレンスとしては https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#kafka-dlq-processing のあたり。
plugins {
id 'org.springframework.boot' version '2.4.0'
id 'io.spring.dependency-management' version '1.0.10.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
// implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.0'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
test {
useJUnitPlatform()
}
spring.cloud.stream.kafka.binder.brokers=localhost:32770 spring.cloud.stream.bindings.input.group=java-consumer-group spring.cloud.stream.bindings.input.destination=dlqtest spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true spring.cloud.stream.kafka.bindings.input.consumer.dlqName=dlqtest-dlq
enableDlqでdead-letter-queueをONにする。そのtopicはdlqNameで指定。
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SampleApplication { public static void main(String[] args) { SpringApplication.run(SampleApplication.class, args); } }
動作確認用にわざとエラーになるconsumerを用意する。これでdlqtestにpublishすればdlqtest-dlqにメッセージがpublishされる。
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) // deprecated public class SampleListener { @StreamListener(Sink.INPUT) // deprecated public void process(String message) { System.out.println(message); throw new RuntimeException("eee"); } }
なお@StreamListenerと@EnableBindingはdeprecatedになっている。javadocとかリファレンス見ると、functional styleで書いてね、とある。が、それについてはこのエントリは触れない(調べてない)。