|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
Kafka 绑定侦听器容器自定义程序
Spring Cloud Stream 提供了强大的消息侦听器容器自定义选项,可通过使用自定义程序来实现。 本节介绍用于 Kafka 的自定义程序接口:0、Kafka 特定扩展 1 以及专用 2。
监听器容器自定义器
Spring Cloud Stream 中的 ListenerContainerCustomizer 是一个通用接口,它允许自定义消息监听容器。
用法
要使用ListenerContainerCustomizer,请在您的配置中创建一个实现此接口的bean:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
return (container, destinationName, group) -> {
// Customize the container here
};
}
ListenerContainerCustomizer 接口定义了以下方法:
void configure(C container, String destinationName, String group);
-
container: 需要自定义的消息监听容器。 -
destinationName: 目的地(主题)的名称。 -
group: 消费者组ID。
Kafka监听器容器自定义程序
接口 KafkaListenerContainerCustomizer 继承自 ListenerContainerCustomizer,用于修改监听器容器的行为,并提供对绑定特定的扩展Kafka消费者属性的访问。
用法
要使用KafkaListenerContainerCustomizer,请在您的配置中创建一个实现此接口的bean:
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
return (container, destinationName, group, properties) -> {
// Customize the Kafka container here
};
}
接口KafkaListenerContainerCustomizer添加了以下方法:
default void configureKafkaListenerContainer(
C container,
String destinationName,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
configure(container, destinationName, group);
}
此方法通过额外的参数扩展了基本 configure 方法:
-
extendedConsumerProperties: 扩展的消费者属性,包括Kafka特定的属性。
带有死信队列和重试自定义功能的监听容器
该 ListenerContainerWithDlqAndRetryCustomizer 接口为涉及死信队列(DLQ)和重试机制的场景提供了额外的自定义选项。
用法
要使用ListenerContainerWithDlqAndRetryCustomizer,请在您的配置中创建一个实现此接口的bean:
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
// Access the container here with access to the extended consumer binding properties.
};
}
ListenerContainerWithDlqAndRetryCustomizer 接口定义了以下方法:
void configure(
AbstractMessageListenerContainer<?, ?> container,
String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
-
container: 要自定义的Kafka监听器容器。 -
destinationName: 目的地(主题)的名称。 -
group: 消费者组ID。 -
dlqDestinationResolver:用于解析失败记录的死信队列(DLQ)目标位置的功能。 -
backOff: 重试的退避策略。 -
extendedConsumerProperties: 扩展的消费者属性,包括Kafka特定的属性。
摘要
-
ListenerContainerWithDlqAndRetryCustomizer在启用死信队列(DLQ)时使用。 -
KafkaListenerContainerCustomizer用于 Kafka 特定的自定义,不使用死信队列(DLQ)。 -
基本
ListenerContainerCustomizer用于通用自定义。
这种层次化的方法允许在 Spring Cloud Stream 应用程序中灵活且具体地自定义您的 Kafka 监听器容器。