错误处理
Apache Kafka Streams 提供了从反序列化错误处理异常的功能。
有关此支持的详细信息,请单击此处。
出站,Apache Kafka Streams 为反序列化异常提供两种类型的处理程序 - LogAndContinueExceptionHandler 和 LogAndFailExceptionHandler。
顾名思义,前者会记录错误并继续处理下一条记录,后者会记录错误并失败。 LogAndFailExceptionHandler 是默认的反序列化异常处理程序。
处理绑定中的反序列化异常
Kafka 流绑定器允许使用以下属性指定反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
or
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
在上面两个反序列化异常处理程序之外,绑定器还提供了一个第三个处理程序,用于将错误记录(毒药药丸)发送到 DLQ(死信队列)主题。 下面是如何启用此 DLQ 异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
当设置此属性时,反序列化错误中的所有记录都会自动发送到DLQ主题。
您可以在下面设置 DLQ 消息发布的话题名称。
你可以在实现类中提供对 0 接口的支持,该接口是函数式接口。1 可以接收 2 和异常作为输入,并允许指定主题名称作为输出。通过获取 Kafka 3 的访问权限,在 4 的实现中可以检查头记录。
这里是提供 DlqDestinationResolver 的实现示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为DlqDestinationResolver提供实现时需要注意,绑定器中的提供器不会自动为应用程序创建主题。
这是因为绑定器无法推断实现可能发送到的所有DLQ主题的名称。
因此,如果使用这种策略提供DLQ名称,就由应用程序负责确保这些主题在之前已创建好。
如果在应用中存在值为DlqDestinationResolver的bean,它将具有更高的优先级。
如果您不希望遵循这种方法,而是使用配置提供静态DLQ名称,您可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果已设置此选项,则错误记录会发送到主题custom-dlq。
如果应用程序未使用上述任何一种策略,则会创建一个具有名称error.<input-topic-name>.<application-id>的死信队列(DLQ)主题。
例如,如果绑定的目标主题为inputTopic,且应用 ID 为process-applicationId,则默认 DLQ 主题为error.inputTopic.process-applicationId。
通常建议为每个输入绑定显式创建一个 DLQ 主题,以便启用 DLQ。
DLQ per input consumer 绑定
属性spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。这意味着如果同一应用程序中有多个函数,该属性将应用于所有这些属性。但是,如果您在同一处理器中有多个处理器或多个输入绑定,则可以使用每输入使用者绑定 Binder 提供的更精细的 DLQ 控制。
如果拥有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
如果您只想在第一个输入绑定上启用DLQ,跳过并继续第二个绑定,那么您可以在使用者上执行此操作,如下所示。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以这种方式设置反序列化异常处理程序具有比在绑定器级别设置更高的优先级。
死信队列分区
按默认值,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少具有与原始记录相同的分区数。
若要更改此行为,请将DlqPartitionFunction实现作为@Bean添加到应用程序上下文中。只能存在一个这样的bean。该函数会提供消费者组(在大多数情况下与应用ID相同)、失败的ConsumerRecord和异常。例如,如果您始终希望路由到分区0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果您将消费者绑定的dlqPartitions属性设置为1(且绑定器的minPartitionCount等于1),则无需提供DlqPartitionFunction;框架将始终使用分区0。如果您将消费者绑定的 |
使用Kafka Streams绑定中的异常处理功能时需要注意的一些事项。
-
属性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。这意味着,如果同一应用程序中有多个函数,则该属性会应用于所有这些函数。 -
反序列化时的异常处理与原生反序列化以及框架提供的消息转换一致。
处理绑定器中的生产异常
与上述描述的反序列化异常处理器支持不同,绑定器不提供此类用于处理生产异常的一流机制。然而,您仍然可以使用StreamsBuilderFactoryBean自定义程序来配置生产异常处理器,有关更多信息,请参阅下面后续部分。
运行时错误处理
当涉及到从应用程序代码(即业务逻辑执行)处理错误时,通常由应用程序自行处理。
因为,Kafka Streams 绑定程序无法干扰应用程序代码。
然而,为了使应用程序的处理更简便,绑定程序提供了一个便捷的 RecordRecoverableProcessor,使用它,你可以指定你希望如何处理应用程序级别的错误。
考虑以下代码。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
如果上述map调用内的业务代码抛出异常,则由您负责处理该错误。
这就是RecordRecoverableProcessor派上用场的地方。
默认情况下,RecordRecoverableProcessor会记录错误并让应用程序继续运行。
假设您想将失败的记录发布到DLT,而不是在应用程序中进行处理。
在这种情况下,您必须使用名为DltAwareProcessor的RecordRecoverableProcessor自定义实现。
以下是操作方法。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
原始 map 调用中的业务逻辑代码现在已被移至 KStream#process 方法调用中,该方法调用接受一个 ProcessorSupplier。
然后,我们将能够向 DLT 发布的自定义 DltAwareProcessor, 传入。
上面的 DltAwareProcessor 构造函数需要三个参数——一个 Function,用于获取输入记录并将业务逻辑操作作为 Function 的主体,DLT 主题以及最后的 DltPublishingContext。
当 Function’s lambda expression throws an exception, the `DltAwareProcessor 将把输入记录发送到 DLT时。
DltPublishingContext 提供了必要的发布基础设施 Bean。DltPublishingContext 由绑定器自动配置,因此您可以直接将其注入应用程序。
如果您不希望绑定器将失败的记录发布到死信队列(DLT),则必须直接使用RecordRecoverableProcessor而不是DltAwareProcessor。
您可以提供自己的恢复器作为BiConsumer,它接受输入Record和异常作为参数。
假设一种场景,在该场景中您不希望将记录发送到DLT,而只需记录消息并继续。
以下是实现此目的的一个示例。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
在这种情况下,当记录失败时,RecordRecoverableProcessor使用用户提供的恢复器,该恢复器是一个BiConsumer,它接受失败的记录和抛出的异常作为参数。
处理 DltAwareProcessor 中的记录键
在使用 DltAwareProcessor 将失败记录发送到 DLT 时,如果您希望将记录键发送到 DLT 主题,则需要在 DLT 绑定上设置适当的序列化程序。
这是因为,DltAwareProcessor 使用 StreamBridge,而 StreamBridge 又使用常规的 Kafka 绑定(基于消息通道),默认情况下会为键使用 ByteArraySerializer。
对于记录值的情况,Spring Cloud Stream 会将有效负载转换为正确的 byte[];然而,对于键来说并非如此,它只是简单地将接收到的标头中的内容作为键传递过去。
如果您提供了非字节数组键,这可能会导致类转换异常,为了避免这种情况,您需要按照下面的方式在 DLT 绑定上设置一个序列化程序。
假设 DLT 目的地为 hello-dlt-1 且记录键的数据类型为字符串。
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer