此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

使用响应式 Kafka Binder 的基本示例

在本节中,我们将展示一些使用响应式绑定器编写响应式 Kafka 应用程序的基本代码片段以及围绕它们的详细信息。spring-doc.cadn.net.cn

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

您可以使用上述upppercase函数与基于消息通道的 Kafka 绑定器 (spring-cloud-stream-binder-kafka)以及反应性 Kafka 绑定剂 (spring-cloud-stream-binder-kafka-reactive),本节讨论的主题。 当将此函数与常规 Kafka 绑定器一起使用时,尽管您在应用程序中使用响应式类型(即,在uppercase函数),您只能在函数执行过程中获得响应式流。 在函数的执行上下文之外,没有响应式好处,因为底层绑定器不是基于响应式堆栈。 因此,尽管这看起来像是带来了完整的端到端响应式堆栈,但此应用程序仅具有部分响应性。spring-doc.cadn.net.cn

现在假设您正在为 Kafka 使用适当的响应式绑定器 -spring-cloud-stream-binder-kafka-reactive与上述函数的应用。 这种 binder 实现将提供从顶端消费到链底端发布的全部响应式好处。 这是因为底层绑定器是建立在 Reactor Kafka 的核心 API 之上的。 在消费者方面,它利用了 KafkaReceiver,它是 Kafka 消费者的响应式实现。 同样,在生产者端,它使用 KafkaSender API,它是 Kafka 生产者的响应式实现。 由于响应式 Kafka 绑定器的基础是建立在适当的响应式 Kafka API 之上的,因此应用程序可以获得使用响应式技术的全部好处。 使用这个响应式 Kafka 绑定器时,应用程序内置了自动背压等响应式功能。spring-doc.cadn.net.cn

从 4.0.2 版开始,您可以自定义ReceiverOptionsSenderOptions通过提供一个或多个ReceiverOptionsCustomizerSenderOptionsCustomizerBeans。 他们是BiFunctions 接收绑定名称和初始选项,返回自定义选项。 接口扩展Ordered因此,当存在多个定制器时,将按所需的顺序应用定制器。spring-doc.cadn.net.cn

默认情况下,活页夹不提交偏移量。 从 4.0.2 版开始,KafkaHeaders.ACKNOWLEDGMENT标头包含一个ReceiverOffset对象,它允许您通过调用其acknowledge()commit()方法。
@Bean
public Consumer<Flux<Message<String>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

请参阅reactor-kafka文档和 JavaDocs 了解更多信息。spring-doc.cadn.net.cn

此外,从 4.0.3 版本开始,Kafka 消费者属性reactiveAtmostOnce可以设置为true并且 Binder 将在处理每次轮询返回的记录之前自动提交偏移量。 此外,从版本 4.0.3 开始,您可以设置 consumer 属性reactiveAutoCommittrue并且 binder 将在处理每次轮询返回的记录后自动提交偏移量。 在这些情况下,确认标头不存在。spring-doc.cadn.net.cn

4.0.2 也提供reactiveAutoCommit,但实现不正确,它的行为类似于reactiveAtMostOnce.

下面是一个如何使用的示例reaciveAutoCommit.spring-doc.cadn.net.cn

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

请注意reactor-kafka返回一个Flux<Flux<ConsumerRecord<?, ?>>>使用自动提交时。 鉴于 Spring 无法访问内部通量的内容,应用程序必须处理本机ConsumerRecord;没有对内容应用消息转换或转换服务。 这需要使用本机解码(通过指定Deserializer配置中适当类型的)返回所需类型的记录键/值。spring-doc.cadn.net.cn