|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
<h2>重试和死信处理</h2>
默认情况下,当您在 consumer 绑定中配置重试(例如 0 和 1),这些功能由 binder 完成,无需 listener 容器或 Kafka 消费者参与。
有几种情况更适合将此功能移到侦听器容器中,例如:
-
聚合重试和延迟将超过消费者的
max.poll.interval.ms属性,可能会导致分区重新平衡。 -
您希望将死信消息发布到不同的Kafka集群。
-
您希望向错误处理程序添加重试侦听器。
-
…
如果想将该操作选择移动到审拟审投源实例,可以定义一个实例不及注入平号量,Spring MVC 将在支持 Servlet 3.0+ 环境的时候,将自动报地该实例,具体说æ68明:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
目标解析程序和BackOff是从绑定属性(如果配置)创建的。使用KafkaTemplate使用来自spring.kafka….属性的配置。然后,您可以使用这些来创建自定义错误处理程序和死信发布程序;例如:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
现在,只要单次重试延迟大于消费者的 max.poll.interval.ms 属性即可。
当与多个绑定器一起使用时,'ListenerContainerWithDlqAndRetryCustomizer' bean 被'DefaultBinderFactory'覆盖。 为了使 bean 可用,需要使用'BinderCustomizer'来设置容器自定义器(请参阅 [binder-customizer]):
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}