|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
使用响应式Kafka绑定器的基本示例
本节展示了使用响应式绑定器编写响应式Kafka应用程序的一些基本代码片段,并详细介绍了它们。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以使用上述uppercase功能与基于消息通道的Kafka绑定器(spring-cloud-stream-binder-kafka)以及响应式Kafka绑定器(spring-cloud-stream-binder-kafka-reactive)一起使用,讨论的主题是本节。
如果使用此功能与常规Kafka绑定器一起使用,虽然在应用程序中使用了反应类型(即uppercase功能),但在函数执行范围内只有反应流。
在函数执行上下文之外,没有响应式优势,因为底层绑定器不是基于反应堆栈的。因此,尽管这看起来像是带来端到端反应堆栈,但该应用程序只是部分响应式的。
现在假设您正在使用适当的响应式绑定器用于Kafka -0 与上面函数的应用。此绑定器实现将在整个链的顶端到底部的发布处提供完整的反应性优势。这是因为在基础绑定器之上构建了
从 4.0.2 版本开始,您可以使用提供一个或多个ReceiverOptionsCustomizer或SenderOptionsCustomizerbean来自定义ReceiverOptions和SenderOptions。它们是接收绑定名称和初始选项并返回自定义选项的BiFunction。Ordered接口扩展使当存在多个时定制器将按所需的顺序应用。
编绑器不默认提交偏移量。
从版本 4.0.2 开始,标题 0 包含一个 1 对象,这允许通过调用其 2 方法或 3 方法来导致偏移量被提交。 |
@Bean
public Consumer<Flux<Message<String>>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有关更多详细信息,请参阅reactor-kafka文档和javadoc。
此外,从版本4.0.3开始,可以将Kafka消费者属性reactiveAtmostOnce设置为true,binder将在处理每次poll返回的记录之前自动提交偏移量。
同样,从版本4.0.3开始,您可以将消费者属性reactiveAutoCommit设置为true,binder将在处理每次poll返回的记录后自动提交偏移量。
在这些情况下,确认头不存在。
4.0.2 虽然也提供了 reactiveAutoCommit,但实现不正确,其行为类似于 reactiveAtMostOnce。 |
以下是一个如何使用reactiveAutoCommit的示例。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,使用自动提交时,reactor-kafka 返回 Flux<Flux<ConsumerRecord<?, ?>>>。
由于 Spring 对内部流的内容没有访问权限,应用程序必须处理原生的 ConsumerRecord;不会对内容应用消息转换或转换服务。
这需要使用原生解码(在配置中指定适当类型的 Deserializer)来返回所需类型的记录键/值。