Kafka Binder Listener Container Customizers
Spring Cloud Stream provides powerful customization options for message listener containers through the use of customizers.
This section covers the customizer interfaces available for Kafka: ListenerContainerCustomizer, its Kafka-specific extension KafkaListenerContainerCustomizer, and the specialized ListenerContainerWithDlqAndRetryCustomizer.
ListenerContainerCustomizer
The ListenerContainerCustomizer is a generic interface in Spring Cloud Stream that allows customization of message listener containers.
Usage
To use the ListenerContainerCustomizer, create a bean that implements this interface in your configuration:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
return (container, destinationName, group) -> {
// Customize the container here
};
}
The ListenerContainerCustomizer interface defines the following method:
void configure(C container, String destinationName, String group);
-
container: The message listener container to customize. -
destinationName: The name of the destination (topic). -
group: The consumer group ID.
KafkaListenerContainerCustomizer
The KafkaListenerContainerCustomizer interface extends ListenerContainerCustomizer to modify the behavior of the listener container and provides access to the binding-specific extended Kafka consumer properties.
Purpose
Use this customizer when you need to access the binding-specific extended Kafka consumer properties while customizing the listener container.
Usage
To use the KafkaListenerContainerCustomizer, create a bean that implements this interface in your configuration:
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
return (container, destinationName, group, properties) -> {
// Customize the Kafka container here
};
}
The KafkaListenerContainerCustomizer interface adds the following method:
default void configureKafkaListenerContainer(
C container,
String destinationName,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
configure(container, destinationName, group);
}
This method extends the base configure method with an additional parameter:
-
extendedConsumerProperties: Extended consumer properties, including Kafka-specific properties.
ListenerContainerWithDlqAndRetryCustomizer
The ListenerContainerWithDlqAndRetryCustomizer interface provides additional customization options for scenarios involving Dead Letter Queues (DLQ) and retry mechanisms.
Purpose
Use this customizer when you need to fine-tune DLQ behavior or implement custom retry logic for your Kafka consumers.
Usage
To use the ListenerContainerWithDlqAndRetryCustomizer, create a bean that implements this interface in your configuration:
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
// Access the container here with access to the extended consumer binding properties.
};
}
The ListenerContainerWithDlqAndRetryCustomizer interface defines the following method:
void configure(
AbstractMessageListenerContainer<?, ?> container,
String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
-
container: The Kafka listener container to customize. -
destinationName: The name of the destination (topic). -
group: The consumer group ID. -
dlqDestinationResolver: A function to resolve the DLQ destination for a failed record. -
backOff: The backoff policy for retries. -
extendedConsumerProperties: Extended consumer properties, including Kafka-specific properties.
Summary
-
ListenerContainerWithDlqAndRetryCustomizeris used if DLQ is enabled. -
KafkaListenerContainerCustomizeris used for Kafka-specific customization without DLQ. -
The base
ListenerContainerCustomizeris used for generic customization.
This hierarchical approach allows for flexible and specific customization of your Kafka listener containers in Spring Cloud Stream applications.