对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
错误处理
Apache Kafka Streams 提供了本机处理反序列化错误异常的功能。
有关此支持的详细信息,请参阅此。
开箱即用,Apache Kafka Streams 提供了两种反序列化异常处理程序 -LogAndContinueExceptionHandler
和LogAndFailExceptionHandler
.
顾名思义,前者将记录错误并继续处理下一条记录,后者将记录错误并失败。LogAndFailExceptionHandler
是默认的反序列化异常处理程序。
在 Binder 中处理反序列化异常
Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两个反序列化异常处理程序外,绑定器还提供了第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。 下面是启用此 DLQ 异常处理程序的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
设置上述属性后,反序列化错误中的所有记录都会自动发送到 DLQ 主题。
您可以设置发布 DLQ 消息的主题名称,如下所示。
您可以为DlqDestinationResolver
这是一个功能接口。DlqDestinationResolver
需要ConsumerRecord
并将异常作为输入,然后允许指定主题名称作为输出。
通过访问 KafkaConsumerRecord
,则可以在BiFunction
.
下面是一个提供实现的示例DlqDestinationResolver
.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在提供实现时要记住的一件重要事情DlqDestinationResolver
是 binder 中的 provisioner 不会为应用程序自动创建主题。
这是因为活页夹无法推断出实现可能发送到的所有 DLQ 主题的名称。
因此,如果使用此策略提供 DLQ 名称,则应用程序有责任确保事先创建这些主题。
如果DlqDestinationResolver
作为 bean 存在于应用程序中,具有更高的优先级。
如果不想遵循此方法,而是使用配置提供静态 DLQ 名称,则可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此设置,则错误记录将发送到主题custom-dlq
.
如果应用程序未使用上述任何一种策略,则它将创建一个名称为error.<input-topic-name>.<application-id>
.
例如,如果绑定的目标主题是inputTopic
应用程序 ID 为process-applicationId
,则默认的 DLQ 主题为error.inputTopic.process-applicationId
.
如果您打算启用 DLQ,则始终建议为每个输入绑定显式创建 DLQ 主题。
DLQ 每个输入使用者绑定
该物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用。
这意味着如果同一应用程序中有多个函数,则此属性将应用于所有函数。
但是,如果单个处理器中有多个处理器或多个输入绑定,则可以使用绑定程序为每个输入使用者绑定提供的更细粒度的 DLQ 控件。
如果您有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且您只想在第一个输入绑定上启用 DLQ 并在第二个绑定上启用 skipAndContinue,然后您可以在使用者上执行以下作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以这种方式设置反序列化异常处理程序的优先级高于在绑定程序级别设置的优先级。
DLQ 分区
默认情况下,记录使用与原始记录相同的分区发布到死信主题。 这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请添加一个DlqPartitionFunction
实现为@Bean
到应用程序上下文。
只能存在一个这样的豆子。
该函数与消费者组一起提供(在大多数情况下与应用程序 ID 相同),失败ConsumerRecord
和例外。
例如,如果您始终想要路由到分区 0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions 属性设置为 1(以及活页夹的minPartitionCount 等于1 ),无需提供DlqPartitionFunction ;框架将始终使用分区 0。
如果将使用者绑定的dlqPartitions 属性设置为大于1 (或活页夹的minPartitionCount 大于1 ),您必须提供一个DlqPartitionFunction bean,即使分区计数与原始主题的分区计数相同。 |
在 Kafka Streams 绑定器中使用异常处理功能时,需要记住几件事。
-
该物业
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用。 这意味着如果同一应用程序中有多个函数,则此属性将应用于所有函数。 -
反序列化的异常处理与本机反序列化和框架提供的消息转换一致。
在活页夹中处理生产异常
与上述对反序列化异常处理程序的支持不同,绑定程序不提供用于处理生产异常的第一类机制。
但是,您仍然可以使用StreamsBuilderFactoryBean
定制器,您可以在下面的后续部分中找到更多详细信息。
运行时错误处理
在处理来自应用程序代码的错误时,即来自业务逻辑执行的错误,通常由应用程序来处理。
因为,Kafka Streams 绑定器没有办法干扰应用程序代码。
但是,为了使应用程序更容易一些,活页夹提供了一个方便的RecordRecoverableProcessor
,您可以使用它来指定如何处理应用程序级错误。
请考虑以下代码。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
如果您的业务代码map
上述调用会抛出异常,您有责任处理该错误。
这是哪里RecordRecoverableProcessor
变得方便。
默认情况下,RecordRecoverableProcessor
,将简单地记录错误并让应用程序继续前进。
假设你想要将失败的记录发布到 DLT,而不是在应用程序中处理它。
在这种情况下,您必须使用RecordRecoverableProcessor
叫DltAwareProcessor
.
以下是您可以做到这一点的方法。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
原始业务逻辑代码map
立即调用已作为KStream#process
方法调用,该调用采用ProcessorSupplier
.
然后,我们将自定义DltAwareProcessor,
能够发布到 DLT。
的构造函数DltAwareProcessor
以上采用三个参数 - 一个Function
它将输入记录和业务逻辑作作为Function
正文、DLT 主题,最后是DltPublishingContext
.
当Function’s lambda expression throws an exception, the `DltAwareProcessor
将输入记录发送到 DLT。
这DltPublishingContext
提供DltAwareProcessor
必要的发布基础架构 Bean。
这DltPublishingContext
由 Binder 自动配置,以便您可以将其直接注入到应用程序中。
如果您不希望活页夹将失败的记录发布到 DLT,则必须使用RecordRecoverableProcessor
直接而不是DltAwareProcessor
.
您可以提供自己的恢复器作为BiConsumer
接受输入Record
并将例外作为参数。
假设这样一个场景,您不想将记录发送到 DLT,而只是记录消息并继续。
下面是如何实现这一目标的示例。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
在这种情况下,当记录失败时,RecordRecoverableProcessor
,使用用户提供的恢复器,该恢复器是一个BiConsumer
将失败的记录和抛出的异常作为参数。
在 DltAwareProcessor 中处理记录键
使用DltAwareProcessor
,如果要将记录键发送到 DLT 主题,则需要在 DLT 绑定上设置正确的序列化器。
这是因为,DltAwareProcessor
使用StreamBridge
它使用常规的 Kafka 绑定器(基于消息通道),默认情况下使用ByteArraySerializer
对于钥匙。
对于记录值,Spring Cloud Stream 将有效负载转换为适当的byte[]
;然而,密钥并非如此,因为它只是将标头中接收到的内容作为密钥传递。
如果提供非字节数组键,则可能会导致类强制转换异常,为避免这种情况,需要在 DLT 绑定上设置序列化程序,如下所示。
假设 DLT 目标为hello-dlt-1
记录键为 String 数据类型。
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer