此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
重试关键业务逻辑
在某些情况下,您可能希望重试业务逻辑中对应用程序至关重要的部分。
可能会对关系数据库进行外部调用,或者从 Kafka Streams 处理器调用 REST 端点。
这些调用可能会因各种原因而失败,例如网络问题或远程服务不可用。
更常见的是,如果您可以重试,这些故障可能会自行解决。
默认情况下,Kafka Streams 绑定器会创建RetryTemplate
所有输入绑定的 bean。
如果函数具有以下签名,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
和默认绑定名称,RetryTemplate
将注册为process-in-0-RetryTemplate
.
这遵循绑定名称 (process-in-0
)后跟文字-RetryTemplate
.
在多个输入绑定的情况下,将有一个单独的RetryTemplate
每个绑定可用的 bean。
如果有自定义RetryTemplate
应用程序中可用的 bean 并通过spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
,则该属性优先于任何输入绑定级别重试模板配置属性。
一旦RetryTemplate
从绑定注入到应用程序中时,它可用于重试应用程序的任何关键部分。
这是一个例子:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者您可以使用自定义RetryTemplate
如下。
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
请注意,当重试用尽时,默认情况下,将抛出最后一个异常,导致处理器终止。
如果您希望处理异常并继续处理,您可以将 RecoveryCallback 添加到execute
方法:
这是一个例子。
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关 RetryTemplate、重试策略、退避策略等的更多信息,请参阅 Spring Retry 项目。