|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
提示、技巧和配方
简单的Kafka死信队列示例
问题说明
作为一名开发者,我想要编写一个消费者应用程序,处理来自Kafka主题的记录。 然而,如果在处理过程中出现某些错误,我不想应用程序完全停止。 相反,我想要将出错的记录发送到DLT(死信主题),然后继续处理新的记录。
解决方案
<ol> <li><p>解决这个问题的方案是使用Spring Cloud Stream中的死信队列(DLQ)功能。</p></li> <li><p>为了讨论的目的,让我们假设我们的处理器函数如下所示。</p></li> </ol>
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
这是一个非常简单的函数,它对处理的所有记录都抛出异常,但您可以扩展此函数以适用于任何其他类似情况。
要将错误记录发送到 DLT,我们需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
要激活 DLQ,应用程序必须提供组名。匿名使用者无法使用 DLQ 功能。我们还需要通过将 Kafka 消费器绑定上的 0 属性设置为 1 来启用 DLQ。最后,我们可以通过在 Kafka 消费器绑定上提供 2 属性来选择性地提供 DLT 名称,否则在这种情况下默认为 3。
注意在上面提供的示例消费者中,负载的类型为byte[]。
默认情况下,Kafka 绑定器中的死信队列(DLQ)生产者期望负载的类型为byte[]。
如果情况并非如此,就需要提供正确的序列化器配置。
例如,让我们将消费者函数重写为如下:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
现在,我们需要告诉Spring Cloud Stream,在写入DLT时,我们希望如何序列化数据。</p><p>这是此方案的修改后的配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
用于具有高级重试选项的死信队列(DLQ)
解决方案
如果您遵循了上面的Recipes,那么在处理遇到错误时,您将获得Kafka绑定器中内置的默认重试选项。
默认情况下,绑定程序重试3次,初始延迟为1秒,每次退避的乘数为2.0,最大延迟为10秒。您可以在下面更改所有这些配置:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果你需要,你也可以通过提供一个布尔值的异常列表来指定可重试的异常。 例如,
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
默认情况下,任何不在上面映射中的异常都会被重试。如果不需要这样做,可以提供来禁用它。
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
您还可以提供自己的 RetryTemplate 并将其标记为 @StreamRetryTemplate,这将由绑定程序扫描并使用。这样在出现错误时可以自动重试,并且有灵活的策略。如果您希望更复杂的重试策略和政策,请参阅下面的内容。
如果您有多个@StreamRetryTemplate个Bean,那么可以使用属性来指定您的绑定想要哪一个。
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
处理死信队列中的反序列化错误
解决方案
正常情况下,Spring Cloud Stream 提供的普通 DLQ 机制在 Kafka 消费者抛出不可恢复的反序列化异常时将无济于事。这是因为在 Consumer 的 poll() 方法返回之前就发生了这个异常。 Spring for Apache Kafka 项目提供了许多出色的方法来帮助绑定器处理这种情况。 让我们探索这些方法。
假设这是我们的函数:<br/>
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
这是一个接受String参数的简单函数。
我们想绕过 Spring Cloud Stream 提供的消息转换器,而使用原生反序列化程序。
在String类型的情况下,这没有太多意义,但对于像 AVRO 等更复杂的类型,你必须依赖外部的反序列化程序,因此想要将转换委托给 Kafka。
现在,当消费者接收到数据时,假设有一条错误记录导致反序列化错误,例如有人传递了Integer而不是String。
在这种情况下,如果你不在应用程序中做些处理,异常将沿着链路传播,最终你的应用程序将会退出。
为了处理这个问题,您可以添加一个ListenerContainerCustomizer@Bean来配置一个DefaultErrorHandler。这个DefaultErrorHandler用DeadLetterPublishingRecoverer进行配置。我们还需要为消费者配置一个ErrorHandlingDeserializer。听起来有很多复杂的事情,但实际上,在这种情况下,它归结为这三个bean。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
让我们逐一分析它们。
第一个是ListenerContainerCustomizer bean,它接受一个DefaultErrorHandler。
容器现在已使用该特定错误处理程序进行了自定义。
此处可以了解有关容器自定义的更多信息。
第二个 bean 是配置为向 DefaultErrorHandler 发布的 DLT。
有关https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current 的更多详细信息,请参阅此处。
第三个bean是DeadLetterPublishingRecoverer,它最终负责向DLT发送数据。默认情况下,DLT主题被命名为ORIGINAL_TOPIC_NAME.DLT。不过你可以更改它。有关更多详细信息,请参阅文档。
我们还需要通过应用程序配置来设置ErrorHandlingDeserializer。
该ErrorHandlingDeserializer委托给实际的反序列化器。出现错误时,它会将记录的关键字/值设置为null,并包含消息的原始字节。然后在标头中设置异常并将此记录传递给监听器,监听器随后调用注册的错误处理程序。
以下是所需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我们正在提供绑定上的ErrorHandlingDeserializer到configuration属性。我们还表明实际要委托的反序列化器是StringDeserializer。
请注意,上述死信队列(dlq)属性与本Recipes中的讨论无关。 它们仅用于解决任何应用程序级别的错误。
Kafka 绑定中的基本偏移量管理
解决方案
我们鼓励您阅读本页上的文档部分,以获得对此的全面理解。
以下是它的gist:
Kafka 默认支持两种类型的偏移量来启动 - earliest 和 latest。它们的语义从名称中就可以明显看出。
假设您是第一次运行使用者。
如果您的 Spring Cloud Stream 应用程序中缺少 group.id,则它会成为一个匿名使用者。
每当存在一个匿名使用者时,Spring Cloud Stream 应用程序默认将从主题分区中的 latest 可用偏移量开始。
另一方面,如果您明确指定了一个 group.id,那么默认情况下,Spring Cloud Stream 应用程序将从主题分区中的 earliest 可用偏移量开始。
在上述两种情况下(具有显式组的使用者和匿名组),可以通过使用属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset并将其设置为earliest或latest来切换起始偏移量。
现在,假设您已经运行过消费者并再次启动它。在这种情况下,上文中的起始偏移语义不适用,因为消费者找到了已提交的消费者组偏移量(对于匿名消费者,尽管应用程序未提供 group.id,但绑定器会为您自动生成一个)。它简单地从上次提交的偏移量继续开始。即使提供了 startOffset 值也是如此。
但是,您可以覆盖默认行为(即消费者从最后提交的偏移量开始),方法是使用resetOffsets属性。为了实现这一点,请将属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets设置为true(这是false的默认值)。然后确保提供startOffset值(可以是earliest或latest)。当你这样操作并启动消费者应用程序时,每次启动都好像这是第一次启动一样,并且会忽略分区的任何已提交偏移量。
在Kafka中定位到任意偏移量
问题说明
使用Kafka绑定器时,我知道可以将偏移量设置为earliest或latest,但我有一个要求是将偏移量定位到中间位置,即任意偏移量。
是否可以通过Spring Cloud Stream Kafka绑定器实现这一目标?
解决方案
之前,我们了解了Kafka绑定器如何处理基本的偏移量管理。 默认情况下,通过我们在上一Recipes中看到的机制,该绑定器不允许您重绕到任意偏移量。 然而,绑定器提供了一些底层策略来实现此用例。 让我们探讨一下它们。
首先,当您要重置到任意偏移量(earliest 或 latest 以外)时,请确保将 resetOffsets 配置保留为其默认值,即 false。
然后,您必须提供一个类型为 KafkaBindingRebalanceListener 的自定义 Bean,该 Bean 将注入所有消费者绑定中。
它是一个接口,并带有一些默认方法,但这里是我们感兴趣的方法:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
让我们看看详细信息。
本质上,每当为主题分区进行初始分配或重新平衡后,此方法都会被调用。
为了更好地说明,假设我们的主题是foo并且它有4个分区。
最初,我们组中只启动一个消费者,该消费者将从所有分区中消费数据。
当消费者第一次启动时,所有4个分区都将被首次分配。
但是,我们并不希望在默认情况下(因为我们定义了一个组),每个分区都从earliest开始消费,而是希望对每个分区,我们在寻找任意偏移量之后才开始消费。
想象一下你有一个业务场景需要如下所示的某些偏移量开始消费。
Partition start offset
0 1000
1 2000
2 2000
3 1000
可以通过如下实现上述方法来达到此目的。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
这只是一个基本的实现。现实世界中的用例比这个复杂得多,你需要相应地进行调整,但这确实给了你一个基本的草图。当消费者seek失败时,它可能会抛出一些运行时异常,你需要决定在这些情况下如何处理。
如果我们使用相同的组 ID 启动第二个消费者怎么办?
当我们添加第二个消费者时,会发生重新平衡,部分分区会被移动。
假设新消费者获得分区 2 和 3。
当这个新的 Spring Cloud Stream 消费者调用此 onPartitionsAssigned 方法时,它会发现这是该消费者上分区 2 和 3 的初始分配。
因此,由于对 initial 参数的条件检查,它将执行 seek 操作。
对于第一个消费者来说,现在它只拥有分区 0 和 1。
然而,对于此消费者而言,这仅仅是一次重新平衡事件,并不被视为初始分配。
因此,由于对 initial 参数的条件检查,它不会重新寻找给定的偏移量。
如何使用 Kafka 绑定手动确认?
解决方案
默认情况下,Kafka 绑定程序会委托给 Spring for Apache Kafka 项目的默认提交设置。
Spring Kafka 中的默认 ackMode 是 batch。
有关详细信息,请参见此处。
在某些情况下,你可能希望禁用此默认提交行为并依赖手动提交。以下步骤可帮助你实现这一点。
将属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode设置为MANUAL或MANUAL_IMMEDIATE。当这样设置时,消费者方法接收到的消息中将包含一个来自KafkaHeaders.ACKNOWLEDGMENT的标题kafka_acknowledgment。
例如,将其视为您的消费者方法。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然后您将属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode设置为MANUAL或MANUAL_IMMEDIATE。
[[如何覆盖 Spring Cloud Stream 中的默认绑定名称?]]<br/>== 如何覆盖 Spring Cloud Stream 中的默认绑定名称?
解决方案
假设以下是您的函数签名。
@Bean
public Function<String, String> uppercase(){
...
}
默认情况下,Spring Cloud Stream 将创建如下绑定。
-
uppercase-in-0
-
uppercase-out-0
您可以使用以下属性覆盖这些绑定。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
之后,所有绑定属性必须使用新名称进行设置,my-transformer-in 和 my-transformer-out。
这里是另一个使用Kafka Streams和多个输入的例子。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称。
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
每次您想要在这些绑定上设置一些配置时,都必须使用这些绑定名称。<br>您不喜欢这样,而希望使用更符合领域且可读性更强的绑定名称,例如<br>
-
订单
-
账号
-
增强订单
您可以通过简单地设置这三个属性来轻松实现这一点
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
完成之后,它会覆盖默认的绑定名称,并且您想要在这些新的绑定名称上设置的任何属性都必须使用这些新的绑定名称。<br>
如何发送消息键作为记录的一部分?
解决方案
通常情况下,您可能需要像映射这样的关联数据结构一样发送带有键和值的记录。Spring Cloud Stream可以以简单直接的方式实现这一点。下面是一个基本蓝图,但您可能希望根据您的特定用例进行调整。
这是一个示例生产者方法(又称为Supplier)。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
这是一个简单的函数,它发送一条负载为String的消息,同时也包含一个键。
请注意,我们使用KafkaHeaders.MESSAGE_KEY将键设置为消息头。
如果您想要更改密钥从默认值 kafka_messageKey,则需要在配置中指定此属性:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
请注意,我们使用绑定名称supplier-out-0作为我们的函数名,请根据实际情况进行更新。
然后,我们在生成消息时使用这个新密钥。
如何使用原生序列化器和反序列化器而不是 Spring Cloud Stream 执行的消息转换?
问题说明
与在Spring Cloud Stream中使用消息转换器不同,我想在Kafka中使用本机序列化程序和反序列化程序。默认情况下,Spring Cloud Stream会使用其内置的消息转换器来处理此转换。如何绕过它并将责任委托给Kafka?
解决方案
这其实很容易做到。
您要做的就是提供以下属性来启用原生序列化。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然后,您还需要设置序列化程序。<br/>有几种方法可以实现这一点。<br/>
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或使用绑定器配置。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用绑定器方式时,它会应用于所有绑定,而将它们设置在绑定上则是每个绑定单独设置。
在反序列化方面,您只需提供反序列化器作为配置即可。
例如,
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
您也可以在绑定器级别上设置它们。
有一个可选的属性,您可以设置它以强制原生解码。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
然而,在Kafka绑定器的情况下,这是不必要的,因为在到达绑定器时,Kafka已经使用配置的反序列化程序对它们进行了反序列化。
解释 Kafka Streams 绑定中的偏移量重置工作方式
解决方案
在我们查看解决方案之前,让我们先看一下以下场景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我们有一个 BiConsumer bean,它需要两个输入绑定。
在这种情况下,第一个绑定用于 KStream,第二个绑定用于 KTable。
当第一次运行此应用程序时,默认情况下,这两个绑定都从 earliest 偏移量开始。
但是,如果由于某些要求我想从 latest 偏移量开始怎么办?
您可以通过启用以下属性来实现这一点。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果您希望只有一个绑定从 latest 偏移量开始,而另一个绑定从默认值 earliest 开始,则请将后者排除在配置之外。
请注意,一旦提交了偏移量,这些设置将< strong >不 < /strong >被遵守,并且已提交的偏移量优先。
跟踪 Kafka 中记录(生产)成功发送的情况
解决方案
假设我们的应用程序中有如下提供商。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然后,我们需要定义一个新的MessageChannelbean来捕获所有成功的发送信息。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下来,在应用程序配置中定义此属性,以提供recordMetadataChannel的bean名称。
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此时,成功发送的信息将被发送到fooRecordChannel。
您可以编写一个IntegrationFlow如下,以查看信息。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在 handle 方法中,有效负载是发送到 Kafka 的内容,消息头包含一个特殊键 kafka_recordMetadata。
它的值是一个 RecordMetadata,其中包含有关主题分区、当前偏移量等信息。
在Kafka中添加自定义标题映射器
解决方案
在正常情况下,这应该是可以的。
想象一下,你有以下的生产者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消费者端,您应该仍然能看到标题“foo”,并且接下来的内容不会给您带来任何问题。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果您在应用程序中提供了< a href="0">自定义标头映射器,那么这将不起作用。
假设您在应用程序中有一个空的< code>1。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果是你的实现,那么在消费者端你将错过foo头部。
有可能你在那些KafkaHeaderMapper方法中包含了一些逻辑。
你需要以下内容来填充foo头部。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
这将正确地从生产者填充foo头到消费者。
关于 id 标题的特别说明
在 Spring Cloud Stream 中,id 头是一个特殊头,但某些应用程序可能希望具有特殊的自定义 ID 头——例如 custom-id 或 ID 或 Id。
第一个(custom-id)将不带任何自定义头映射器从生产者传播到消费者。
但是,如果您使用框架保留的 id 头变体进行生产——例如 ID、Id、iD 等等。那么您将在框架内部遇到问题。
有关此用例的更多上下文,请参见StackOverflow 线程。
在这种情况下,您必须使用自定义 KafkaHeaderMapper 来映射区分大小写的 ID 头。
例如,假设您有以下生产者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
上面的标题Id将从使用方消失,因为它与框架id标题冲突。您可以通过提供一个自定义KafkaHeaderMapper来解决这个问题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
通过这样做,id 和 Id 头部信息都将从生产者侧提供给消费者侧。
在事务中向多个主题发送
解决方案
在 Kafka 绑定中使用事务支持进行事务处理,然后提供一个 AfterRollbackProcessor。为了向多个主题发送消息,请使用 StreamBridge API。
下面是这些代码片段:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
所需配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
要进行测试,可以使用以下内容:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要说明:
请确保您的应用程序配置中没有任何死信队列(DLQ)设置,因为我们手动配置了死信主题(默认情况下,它将发布到名为input.DLT的主题上,基于初始消费者函数)。
另外,请将消费者绑定中的maxAttempts重置为1,以避免绑定程序进行重试。
在上面的例子中,总共最多尝试3次(初次尝试加上FixedBackoff中的两次尝试)。
有关如何测试此代码的更多详细信息,请参阅StackOverflow线程。如果您使用Spring Cloud Stream通过添加更多的消费者函数来测试它,请确保在消费者绑定上设置isolation-level为read-committed。
这个 StackOverflow 线程 与此讨论也相关。
运行多个轮询消费者时应避免的陷阱
解决方案
假设我有以下定义:<br>
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
运行应用程序时,Kafka 消费者会生成一个 client.id(例如 consumer-my-group-1)。对于每个正在运行的应用程序实例,这个 client.id 都是相同的,这会导致意外的问题。
为了解决这个问题,您可以在应用程序的每个实例上添加以下属性:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
有关更多详细信息,请参见GitHub问题。