死信主题处理
启用DLQ
To enable DLQ, a Kafka binder based applications must provide a consumer group via the property spring.cloud.stream.bindings.<binding-name>.group.
Anonymous consumer groups (i.e, where the application does not explicitly provide a group) cannot enable the DLQ feature.
当一个应用程序想要将出错的记录发送到死信队列(DLQ)主题时,该应用程序必须启用 DLQ 功能,因为默认情况下此功能是禁用的。
要启用 DLQ,必须将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq 设置为 true。
当DLQ启用时,如果在处理过程中发生错误且基于spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts属性的所有重试都用尽,则该记录将发送到DLQ主题。
默认情况下,属性max-attempts设置为3。当属性max-attempts大于属性1,且启用dlq时,你会看到重试将遵守属性max-attempts。当未启用任何dlq(这是默认值),则属性max-attempts对重试处理方式没有任何影响。在这种情况下,重试将回退到Spring for Apache Kafka的容器默认值,即10次重试。如果应用程序想要在禁用DLQ时完全禁用重试,那么将属性max-attempts设置为1将不起作用。要在那种情况下完全禁用重试,您需要提供一个ListenerContainerCustomizer,然后使用适当的Backoff设置。这是个例子。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> {
var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
container.setCommonErrorHandler(commonErrorHandler);
};
}
使用此功能,将禁用默认容器行为,不会尝试重试。 如上所述,启用 DLQ 后,绑定器设置将优先于其他设置。
处理死信主题中的记录
因为框架无法预测用户希望如何处理死信消息,因此没有提供任何标准处理机制。<br>如果死信的原因是暂时性的,您可能希望将消息重新路由回原始主题。<br>但是,如果问题是永久性问题,那可能会导致无限循环。<br>此主题中的Spring Boot示例应用程序是一个如何将这些消息重新路由回原始主题的示例,但它会在三个尝试后将它们移动到“停车场”主题。<br>该应用程序是从死信主题读取的另一个spring-cloud-stream应用程序。<br>当5秒钟内未收到任何消息时,它会退出。
示例假设原始目标为 so8400out,消费组为 so8400。
有几种策略要考虑:
-
考虑仅在主应用程序未运行时运行重新路由。否则,非常快速地用完临时错误重试。
-
另外,可以使用两阶段的方法:使用此应用程序对第三个主题进行路由,并使用另一个对从那里返回到主主题。
下面的代码清单显示了示例应用程序:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}