对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
提示、技巧和Recipes
带有 Kafka 的简单 DLQ
问题陈述
作为一名开发人员,我想编写一个消费者应用程序来处理来自 Kafka 主题的记录。 但是,如果在处理过程中出现一些错误,我不希望应用程序完全停止。 相反,我想将错误的记录发送到 DLT(死信主题),然后继续处理新记录。
溶液
这个问题的解决方案是使用 Spring Cloud Stream 中的 DLQ 功能。 出于本次讨论的目的,让我们假设以下是我们的处理器功能。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
这是一个非常微不足道的函数,它会为它处理的所有记录抛出异常,但您可以采用此函数并将其扩展到任何其他类似情况。
为了将错误的记录发送到 DLT,我们需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
为了激活 DLQ,应用程序必须提供组名称。
匿名使用者无法使用 DLQ 设施。
我们还需要通过设置enableDLQ
Kafka 消费者绑定到true
.
最后,我们可以选择通过提供dlqName
在 Kafka 消费者绑定上,否则默认为error.input-topic.my-group
在这种情况下。
请注意,在上面提供的示例消费者中,有效负载的类型是byte[]
.
默认情况下,Kafka binder 中的 DLQ 生产者需要byte[]
.
如果不是这种情况,那么我们需要提供正确序列化程序的配置。
例如,让我们重写消费者函数,如下所示:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
现在,我们需要告诉 Spring Cloud Stream,在写入 DLT 时我们希望如何序列化数据。 下面是此方案的修改配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
具有高级重试选项的 DLQ
溶液
如果您遵循了上述配方,那么当处理遇到错误时,您将获得 Kafka 绑定器中内置的默认重试选项。
默认情况下,活页夹最多停用 3 次尝试,初始延迟为 1 秒,每次回退为 2.0 乘数,最大延迟为 10 秒。 您可以按如下方式更改所有这些配置:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果需要,还可以通过提供布尔值映射来提供可重试异常的列表。 例如
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
默认情况下,将重试上面地图中未列出的任何异常。 如果不需要,那么您可以通过提供
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
您也可以提供自己的RetryTemplate
并将其标记为@StreamRetryTemplate
活页夹将扫描并使用。
当需要更复杂的重试策略和策略时,这非常有用。
如果您有多个@StreamRetryTemplate
bean,那么你可以使用属性
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
使用 DLQ 处理反序列化错误
溶液
当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的正常 DLQ 机制将无济于事。
这是因为,此异常甚至发生在消费者的poll()
方法返回。
Spring for Apache Kafka 项目提供了一些很好的方法来帮助 Binder 解决这种情况。
让我们来探讨一下。
假设这是我们的函数:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
这是一个简单的函数,需要一个String
参数。
我们想绕过 Spring Cloud Stream 提供的消息转换器,而是想使用本机反序列化器。
在以下情况下String
类型,这没有多大意义,但对于更复杂的类型,如 AVRO 等,您必须依赖外部解序列化器,因此希望将转换委托给 Kafka。
现在,当消费者收到数据时,让我们假设有一个导致反序列化错误的错误记录,也许有人传递了Integer
而不是String
例如。 在这种情况下,如果您不在应用程序中执行某些作,则异常将通过链传播,并且您的应用程序最终将退出。
为了处理这个问题,您可以添加一个ListenerContainerCustomizer
@Bean
配置DefaultErrorHandler
. 这DefaultErrorHandler
配置了DeadLetterPublishingRecoverer
. 我们还需要配置一个ErrorHandlingDeserializer
对于消费者来说。这听起来像很多复杂的事情,但实际上,在这种情况下,它归结为这 3 个豆子。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
让我们逐一分析一下。第一个是ListenerContainerCustomizer
需要DefaultErrorHandler
. 容器现在已使用该特定错误处理程序进行自定义。您可以在此处了解有关容器自定义的更多信息。
第二个 bean 是DefaultErrorHandler
配置了发布到DLT
. 有关更多详细信息,请参阅此处DefaultErrorHandler
.
第三个 bean 是DeadLetterPublishingRecoverer
最终负责发送到DLT
. 默认情况下,DLT
主题被命名为 ORIGINAL_TOPIC_NAME.DLT 的 DLT。不过,您可以更改它。有关更多详细信息,请参阅文档。
我们还需要通过应用程序配置配置 ErrorHandlingDeserializer。
这ErrorHandlingDeserializer
委托给实际的解序列化程序。
如果出现错误,它会将记录的键/值设置为空,并包含消息的原始字节。
然后,它在标头中设置异常,并将此记录传递给侦听器,然后侦听器调用已注册的错误处理程序。
以下是所需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我们正在提供ErrorHandlingDeserializer
通过configuration
属性。
我们还指出,要委托的实际反序列化程序是StringDeserializer
.
请记住,上述 dlq 属性都与此配方中的讨论无关。 它们纯粹用于解决任何应用程序级错误。
Kafka 绑定器中的基本偏移量管理
溶液
我们鼓励您阅读有关此内容的文档部分以全面了解它。
这是它的要点:
Kafka 默认支持两种类型的偏移量开始 -earliest
和latest
.
从他们的名字来看,它们的语义是不言自明的。
假设您是第一次运行消费者。
如果您错过了 Spring Cloud Stream 应用程序中的 group.id,那么它就会成为匿名消费者。
无论何时,您有一个匿名消费者,在这种情况下,Spring Cloud Stream 应用程序默认将从latest
主题分区中的可用偏移量。
另一方面,如果显式指定 group.id,则默认情况下,Spring Cloud Stream 应用程序将从earliest
主题分区中的可用偏移量。
在上述两种情况下(具有显式组和匿名组的使用者),可以使用属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
并将其设置为earliest
或latest
.
现在,假设您之前已经运行了消费者,现在再次启动它。在这种情况下,上述情况下的起始偏移量语义不适用,因为消费者找到了消费者组的已提交偏移量(对于匿名消费者,尽管应用程序不提供 group.id,但绑定器会自动为您生成一个)。它只是从最后一个提交的偏移量开始。这是真的,即使你有一个startOffset
提供的值。
但是,您可以使用resetOffsets
财产。 为此,请设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
自true
(即false
默认情况下)。然后确保提供startOffset
值(earliest
或latest
). 当您执行此作,然后启动使用者应用程序时,每次启动时,它都会像第一次启动一样启动,并忽略分区的任何已提交偏移量。
在 Kafka 中寻求任意偏移
问题陈述
使用 Kafka 绑定器,我知道它可以将偏移量设置为earliest
或latest
,但我需要寻找中间某些东西的偏移量,一个任意偏移量。有没有办法使用 Spring Cloud Stream Kafka 绑定器来实现这一点?
溶液
之前我们了解了 Kafka binder 如何允许您处理基本的偏移量管理。 默认情况下,活页夹不允许您倒带到任意偏移量,至少通过我们在该配方中看到的机制是这样。 但是,活页夹提供了一些低级策略来实现此用例。 让我们来探讨一下它们。
首先,当您想要重置为任意偏移量时,除了earliest
或latest
,请务必将resetOffsets
配置设置为其默认值,即false
.
然后,您必须提供类型为KafkaBindingRebalanceListener
,这将被注入到所有消费者绑定中。
这是一个带有一些默认方法的界面,但这是我们感兴趣的方法:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
让我们看看细节。
从本质上讲,每次在主题分区的初始分配期间或重新平衡之后都会调用此方法。
为了更好地说明,让我们假设我们的主题是foo
它有 4 个分区。
最初,我们只启动组中的单个使用者,并且该使用者将从所有分区中使用。
当使用者首次启动时,所有 4 个分区都会被初始分配。
但是,我们不想启动要以默认值 (earliest
由于我们定义了一个组),而不是对于每个分区,我们希望它们在寻求任意偏移量后使用。
假设您有一个业务案例要从某些偏移中使用,如下所示。
Partition start offset
0 1000
1 2000
2 2000
3 1000
这可以通过实现上述方法来实现,如下所示。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
这只是一个基本的实现。
现实世界的用例比这复杂得多,您需要进行相应的调整,但这肯定会为您提供基本的草图。
当消费者seek
失败,它可能会抛出一些运行时异常,您需要决定在这些情况下该怎么做。
[[如果我们用相同的组 ID 开始第二个消费者呢? === 如果我们使用相同的组 ID 启动第二个消费者会怎样?
当我们添加第二个消费者时,将发生重新平衡,并且一些分区将被移动。
假设新的使用者获得分区2
和3
.
当这个新的 Spring Cloud Stream 消费者调用onPartitionsAssigned
方法,它将看到这是分区的初始赋值2
和3
在这个消费者身上。
因此,它将执行寻道作,因为对initial
论点。
对于第一个消费者,它现在只有分区0
和1
但是,对于该消费者来说,这只是一个重新平衡事件,不被视为初始分配。
因此,它不会重新搜索给定的偏移量,因为对initial
论点。
[[如何手动确认使用卡夫卡绑定器?]] == 如何使用 Kafka 绑定器手动确认?
溶液
默认情况下,Kafka 绑定器委托给 Spring for Apache Kafka 项目中的默认提交设置。
默认值ackMode
在Spring,卡夫卡是batch
.
有关这方面的更多详细信息,请参阅此处。
在某些情况下,您希望禁用此默认提交行为并依赖手动提交。 以下步骤允许您做到这一点。
设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode
设置为MANUAL
或MANUAL_IMMEDIATE
.
当它设置为这样时,就会有一个名为kafka_acknowledgment
(从KafkaHeaders.ACKNOWLEDGMENT
) 存在于消费者方法接收的消息中。
例如,将其想象为您的消费者方法。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然后,您将属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode
自MANUAL
或MANUAL_IMMEDIATE
.
[[如何做 i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何覆盖 Spring Cloud Stream 中的默认绑定名称?
溶液
假设下面是函数签名。
@Bean
public Function<String, String> uppercase(){
...
}
默认情况下,Spring Cloud Stream 将创建如下绑定。
-
0 中的大写
-
大写-0
可以使用以下属性将这些绑定重写到某些内容。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
在此之后,必须对新名称创建所有绑定属性,my-transformer-in
和my-transformer-out
.
这是另一个使用 Kafka Streams 和多个输入的示例。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称。
-
流程订单 in 0
-
流程订购合1
-
流程订单出 0
每次要在这些绑定上设置一些配置时,都必须使用这些绑定名称。 您不喜欢这样,并且希望使用更对域友好且可读的绑定名称,例如,类似的东西。
-
订单
-
帐户
-
enrichedOrders
只需设置这三个属性即可轻松做到这一点
-
spring.cloud.stream.function.bindings.processOrder-in-0=订单
-
spring.cloud.stream.function.bindings.processOrder-in-1=账户
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
执行此作后,它将覆盖默认绑定名称,并且要在其上设置的任何属性都必须位于这些新绑定名称上。
[[如何发送消息密钥作为我记录的一部分?]] == 如何发送消息密钥作为记录的一部分?
溶液
通常需要将关联数据结构(如地图)作为具有键和值的记录发送。 Spring Cloud Stream 允许您以简单的方式做到这一点。 以下是执行此作的基本蓝图,但您可能希望将其调整为特定用例。
这是示例生产者方法(又名Supplier
).
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
这是一个微不足道的函数,它发送一条带有String
payload,但也带有一个键。请注意,我们将键设置为消息头KafkaHeaders.MESSAGE_KEY
.
如果要更改默认键kafka_messageKey
,那么在配置中,我们需要指定这个属性:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
请注意,我们使用绑定名称supplier-out-0
由于这是我们的函数名称,请相应更新。
然后,我们在生成消息时使用这个新键。
[[如何使用-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]]== 如何使用本机序列化器和反序列化程序而不是由 Spring Cloud Stream 完成的消息转换?
问题陈述
我想在 Kafka 中使用本机序列化器和反序列化器,而不是使用 Spring Cloud Stream 中的消息转换器。默认情况下,Spring Cloud Stream 使用其内部内置消息转换器来处理此转换。如何绕过这一点并将责任委托给 Kafka?
溶液
这真的很容易做到。
您所要做的就是提供以下属性以启用本机序列化。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然后,您还需要设置序列化程序。 有几种方法可以做到这一点。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或使用活页夹配置。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用绑定方式时,它适用于所有绑定,而在绑定处设置它们是每个绑定。
在反序列化方面,您只需提供反序列化器作为配置。
例如
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
您还可以在活页夹级别设置它们。
可以设置一个可选属性来强制本机解码。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
但是,对于 Kafka 绑定器,这是不必要的,因为当它到达绑定器时,Kafka 已经使用配置的反序列化器对它们进行了反序列化。
解释偏移重置在 Kafka Streams 绑定器中的工作原理
溶液
在我们查看解决方案之前,让我们先看看以下场景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我们有一个BiConsumer
需要两个输入绑定的 bean。
在这种情况下,第一个绑定是针对KStream
第二个是针对KTable
.
首次运行此应用程序时,默认情况下,两个绑定都从earliest
抵消。
我想从latest
由于某些要求而抵消?
您可以通过启用以下属性来执行此作。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果您只想从一个绑定开始latest
offset 和另一个与默认值的消费者earliest
,则将后者从配置中保留绑定。
请记住,一旦存在已提交偏移量,这些设置将不会被接受,并且已提交偏移量优先。
跟踪 Kafka 中记录的成功发送(生成)
溶液
让我们假设我们在应用程序中有以下提供商。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然后,我们需要定义一个新的MessageChannel
bean 来捕获所有成功的发送信息。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下来,在应用程序配置中定义此属性,以提供recordMetadataChannel
.
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此时,成功发送的信息将发送到fooRecordChannel
.
您可以编写一个IntegrationFlow
如下所示,以查看信息。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在handle
方法,有效负载是发送到 Kafka 的内容,消息头包含一个名为kafka_recordMetadata
.
它的值是RecordMetadata
包含有关主题分区、当前偏移量等的信息。
在 Kafka 中添加自定义标头映射器
溶液
一般情况下,这应该没问题。
想象一下,你有以下生产商。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消费者方面,您仍然应该看到标头“foo”,并且以下内容应该不会给您带来任何问题。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果您在应用程序中提供自定义标头映射器,则这将不起作用。
假设您有一个空的KafkaHeaderMapper
在应用程序中。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果这是你的实现,那么你会错过foo
标头。
很有可能,你可能在其中有一些逻辑KafkaHeaderMapper
方法。
您需要以下内容来填充foo
页眉。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
这将正确填充foo
标头。
id 标头的特别说明
在 Spring Cloud Stream 中,id
header 是一个特殊的标头,但某些应用程序可能希望具有特殊的自定义 ID 标头 - 类似于custom-id
或ID
或Id
.
第一个 (custom-id
)将在没有任何自定义标头映射器的情况下从生产者传播到消费者。
但是,如果使用保留的框架变体进行生产id
header - 例如ID
,Id
,iD
等等,那么你会遇到框架内部的问题。
请参阅此 StackOverflow 线程,了解有关此用例的更多上下文。
在这种情况下,您必须使用自定义KafkaHeaderMapper
以映射区分大小写的 ID 标头。
例如,假设您有以下生产者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
标题Id
以上将从消费方消失,因为它与框架发生冲突id
页眉。
可以提供自定义KafkaHeaderMapper
来解决这个问题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
通过这样做,两者id
和Id
标头将从生产者到消费者端可用。
在事务中生成多个主题
溶液
在 Kafka 绑定器中使用事务支持,然后提供AfterRollbackProcessor
.
为了生成多个主题,请使用StreamBridge
应用程序接口。
以下是为此的代码片段:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
所需配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
为了进行测试,您可以使用以下内容:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要注意事项:
请确保您在应用程序配置上没有任何 DLQ 设置,因为我们手动配置 DLT(默认情况下,它将发布到名为input.DLT
基于初始消费者函数)。
此外,重置maxAttempts
在消费者绑定到1
以避免绑定器重试。
在上面的示例中,它将最多尝试 3 次(初始尝试 + 在FixedBackoff
).
有关如何测试此代码的更多详细信息,请参阅 StackOverflow 线程。
如果您使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将isolation-level
在消费者绑定到read-committed
.
这个 StackOverflow 线程也与此讨论有关。
运行多个可轮询消费者时要避免的陷阱
溶液
假设我有以下定义:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
运行应用程序时,Kafka 消费者会生成一个 client.id(类似于consumer-my-group-1
).
对于正在运行的应用程序的每个实例,此client.id
将是相同的,从而导致意外问题。
为了解决此问题,可以在应用程序的每个实例上添加以下属性:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
有关更多详细信息,请参阅此 GitHub 问题。