|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
重试关键业务逻辑
在某些情况下,您可能需要重试对应用程序至关重要的业务逻辑部分。
可能存在对关系型数据库的外部调用或从 Kafka Streams 处理器调用 REST 端点。
由于网络问题或远程服务不可用等原因,这些调用可能会失败。
通常,如果您可以再次尝试,这些故障会自行解决。
默认情况下,Kafka Streams 绑定程序为所有输入绑定创建 RetryTemplate 个 bean。
如果函数具有以下签名,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
并且使用默认绑定名称时,RetryTemplate 将被注册为 process-in-0-RetryTemplate。这遵循了绑定名称(process-in-0)后跟字面量 -RetryTemplate 的约定。在多个输入绑定的情况下,每个绑定都会有一个单独的 RetryTemplate Bean 可用。如果应用程序中存在通过 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName 提供的自定义 RetryTemplate Bean,则该 Bean 优先于任何输入绑定级别的重试模板配置属性。
一改技术从绑定中注入的 RetryTemplate 可以运用于重试应用程序的任何重要部分.
以下是一个例子:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者您可以像下面这样使用自定义的 RetryTemplate。
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
请注意,当重试次数耗尽时,默认情况下会抛出最后一个异常,导致处理器终止。如果您希望处理该异常并继续处理,可以向execute方法添加一个RecoveryCallback:以下是示例。
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关 RetryTemplate、重试策略、退避策略等方面的更多信息,请参阅Spring Retry项目。