使用响应式 Kafka Binder 的基本示例
在本节中,我们将展示一些使用响应式绑定器编写响应式 Kafka 应用程序的基本代码片段以及围绕它们的详细信息。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以使用上述uppercase
函数与基于消息通道的 Kafka 绑定器 (spring-cloud-stream-binder-kafka
)以及反应性 Kafka 绑定剂 (spring-cloud-stream-binder-kafka-reactive
),本节讨论的主题。
当将此函数与常规 Kafka 绑定器一起使用时,尽管您在应用程序中使用响应式类型(即,在uppercase
函数),您只能在函数执行过程中获得响应式流。
在函数的执行上下文之外,没有响应式好处,因为底层绑定器不是基于响应式堆栈。
因此,尽管这看起来像是带来了完整的端到端响应式堆栈,但此应用程序仅具有部分响应性。
现在假设您正在为 Kafka 使用适当的响应式绑定器 -spring-cloud-stream-binder-kafka-reactive
与上述函数的应用。
这种 binder 实现将提供从顶端消费到链底端发布的全部响应式好处。
这是因为底层绑定器是建立在 Reactor Kafka 的核心 API 之上的。
在消费者方面,它利用了 KafkaReceiver,它是 Kafka 消费者的响应式实现。
同样,在生产者端,它使用 KafkaSender API,它是 Kafka 生产者的响应式实现。
由于响应式 Kafka 绑定器的基础是建立在适当的响应式 Kafka API 之上的,因此应用程序可以获得使用响应式技术的全部好处。
使用这个响应式 Kafka 绑定器时,应用程序内置了自动背压等响应式功能。
从 4.0.2 版开始,您可以自定义ReceiverOptions
和SenderOptions
通过提供一个或多个ReceiverOptionsCustomizer
或SenderOptionsCustomizer
Beans。
他们是BiFunction
s 接收绑定名称和初始选项,返回自定义选项。
接口扩展Ordered
因此,当存在多个定制器时,将按所需的顺序应用定制器。
默认情况下,活页夹不提交偏移量。
从 4.0.2 版开始,KafkaHeaders.ACKNOWLEDGMENT 标头包含一个ReceiverOffset 对象,它允许您通过调用其acknowledge() 或commit() 方法。 |
@Bean
public Consumer<Flux<Message<String>>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
请参阅reactor-kafka
文档和 JavaDocs 了解更多信息。
此外,从 4.0.3 版本开始,Kafka 消费者属性reactiveAtmostOnce
可以设置为true
并且 Binder 将在处理每次轮询返回的记录之前自动提交偏移量。
此外,从版本 4.0.3 开始,您可以设置 consumer 属性reactiveAutoCommit
自true
并且 binder 将在处理每次轮询返回的记录后自动提交偏移量。
在这些情况下,确认标头不存在。
4.0.2 也提供reactiveAutoCommit ,但实现不正确,它的行为类似于reactiveAtMostOnce . |
下面是一个如何使用的示例reactiveAutoCommit
.
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意reactor-kafka
返回一个Flux<Flux<ConsumerRecord<?, ?>>>
使用自动提交时。
鉴于 Spring 无法访问内部通量的内容,应用程序必须处理本机ConsumerRecord
;没有对内容应用消息转换或转换服务。
这需要使用本机解码(通过指定Deserializer
配置中适当类型的)返回所需类型的记录键/值。