|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
错误处理
在本节中,我们将解释框架提供的错误处理机制的一般概念。我们将以Rabbit绑定器为例,因为每个绑定器为特定支持的机制定义了一组不同的属性,这些机制特定于底层代理的功能集(如Kafka绑定器)。
发生错误,Spring Cloud Stream 提供了几种灵活的处理机制。 请注意,这些技术取决于绑定程序实现以及底层消息传递中间件的功能以及编程模型(稍后更多)。
每当消息处理程序(函数)抛出异常,它就会被传播回绑定器,在此点上绑定器将尝试多次重试同一消息(默认为 3 次)使用 0 由 Spring Retry 库提供。 如果重试失败,取决于错误处理机制可能 删除 消息、重新排队 消息进行重新处理或 发送失败的消息到 DLQ 。
两者(Rabbit 和 Kafka)都支持这些概念(尤其是DLQ)。然而,其他绑定器可能不支持,因此请参阅您所使用绑定器的文档以了解支持的错误处理选项。
请注意,reactive 函数 不符合消息处理器的标准,因为它不处理单个消息,而是提供了一种将框架提供的流(即 Flux)与用户提供的流连接起来的方法。 为什么这很重要? 这是因为本节后面讨论的所有重试模板、丢弃失败的消息、重试、DLQ 和有助于所有这些的配置属性都仅适用于消息处理器(即面向对象的函数)
重申:请确保你的答案完全符合前一个回复中描述的格式和示例。如果格式不正确,你可能会提供无效或部分答案,这会导致你在后续回复中失去上下文。
响应式 API 提供了非常丰富的库级运算符和其他机制,可用于处理各种响应式用例中的错误,这些用例比简单的消息处理器用例要复杂得多。因此,请使用它们,例如在reactor.core.publisher.Flux中找到的public final Flux<T> retryWhen(Retry retrySpec);。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux
.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
.map(v -> v.toUpperCase());
}
丢弃失败消息
默认情况下,系统提供错误处理程序。第一个错误处理程序只是记录错误消息。第二个错误处理程序是与绑定器相关的错误处理程序,它负责处理特定消息传递系统(例如,发送到 DLQ)中的错误消息。但在这种当前场景中没有提供任何其他错误处理配置,因此此处理器将不做任何操作。因此,经过记录后,消息将被丢弃。
在某些情况下可以接受,但在大多数情况下不行,我们需要一些恢复机制来避免消息丢失。
处理错误消息
在上一节中,我们提到,默认情况下,导致错误的消息会有效地记录并丢弃。框架还公开了让您提供自定义错误处理程序(例如,发送通知或写入数据库等)的方法。您可以通过添加来执行此操作Consumer这专门设计为接受ErrorMessage除了错误信息外(例如,堆栈跟踪等),它还包含原始消息(触发错误的那条消息)。
| 自定义错误处理程序与框架提供的错误处理程序(即日志错误处理程序和绑定特定错误处理程序——参见上一节)互斥,以确保它们不会互相干扰。当你提供一个自定义错误处理程序时,即使已配置为将失败消息发送到DLQ,该功能也不会生效。 |
@Bean
public Consumer<ErrorMessage> myErrorHandler() {
return v -> {
// send SMS notification code
};
}
要将此类消费者标识为错误处理器,您只需要提供一个指向函数名称的error-handler-definition属性-1。
对于例如,绑定属性名称uppercase-in-0,该属性看起来像这样:(代码)0(代码)
spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler
如果使用特殊映射指令将绑定映射到更可读的名称——spring.cloud.stream.function.bindings.uppercase-in-0=upper,那么此属性看起来如下所示:
spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果意外地将这样的处理程序声明为Function,它仍然可以正常工作,唯一的区别是其输出将不会有任何处理。然而,由于这样的处理程序仍然依赖于Spring Cloud Function提供的功能,如果你的处理程序有一些复杂性,你可以通过函数组合来解决(尽管这种情况可能不太可能)。 |
默认错误处理器
如果你希望所有功能 bean 都有一个统一的错误处理器,你可以使用标准的 spring-cloud-stream 机制来定义默认属性spring.cloud.stream.default.error-handler-definition=myErrorHandler
DLQ - 死信队列
也许最常用的方法是死信队列,它允许将失败的消息发送到特殊目的地:死信队列。
当配置了,失败的消息会被发送到这个目的地进行后续的重新处理或审核以及核算。
考虑以下示例:
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class,
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
"--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
);
}
@Bean
public Function<Person, Person> uppercase() {
return personIn -> {
throw new RuntimeException("intentional");
});
};
}
}
作为提醒,在本示例中,属性目标绑定的uppercase-in-0段对应输入目的地的名称。
consumer段表示这是一个消费者属性。
当使用 DLQ 时,至少必须为 DLQ 目标提供group属性,以便正确命名。然而,在我们的示例中,group通常与destination属性一起使用。 |
除了设置一些标准属性外,我们还将 auto-bind-dlq 设置为指示绑定器创建并配置 DLQ 目的地。 uppercase-in-0 绑定与此对应的 uppercase 目的地(请参阅相应的属性),这导致一个附加的 Rabbit 队列名为 uppercase.myGroup.dlq(有关 Kafka 特定 DLQ 属性,请参阅 Kafka 文档)。
一旦配置好,所有失败的消息都会被路由到这个目标,保留原始消息供进一步操作。
您可以看到错误消息包含更多与原始错误相关的详细信息,如下所示:
. . . .
x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is
org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at. . . . .
Payload: blah
你还可以通过将max-attempts设置为1,使消息立即被路由到死信队列(无需重试)。例如,
--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1
重试模版
在本节中,我们涵盖了与重试功能配置相关的配置属性。
在 RetryTemplate 中,Spring Retry 库是其中的一部分。虽然本文档不涵盖 RetryTemplate 的所有功能,但我们将提及与 RetryTemplate 直接相关的以下消费者属性:
- maxAttempts
-
处理消息的尝试次数。
默认值:3。
- 重试初始间隔
-
重试时的退避初始间隔。
默认1000毫秒。
- 重试最大间隔
-
最大退避间隔。
默认10000毫秒。
- 退避乘数
-
退避乘数。
默认2.0。
- 默认可重试
-
监听器抛出且未列在
retryableExceptions中的异常是否可重试。默认值:
true。 - 可重试异常
-
一个以 Throwable 类名作为键,布尔值作为值的映射。
指定那些将要或不会重试的异常(以及子类)。
另请参阅defaultRetriable。
示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false。默认:空。
虽然前面的设置对于大多数自定义需求来说已经足够,但可能无法满足某些复杂的要求,在这种情况下,您可能希望提供自己的RetryTemplate实例。为此,请在应用程序配置中将其配置为一个bean。应用程序提供的实例将覆盖框架提供的实例。另外,为了避免冲突,您必须将要由绑定器使用的RetryTemplate实例限定为@StreamRetryTemplate。例如,
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
return new RetryTemplate();
}
从上面的示例中可以看出,您不需要用@Bean注解它,因为@StreamRetryTemplate是合格的@Bean。
如果您需要更精确地控制RetryTemplate,可以在ConsumerProperties中指定bean名称,以便为每个绑定关联特定的重试bean。
spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>