这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 spring-doc.cadn.net.cn

Ancillaries to the programming model

单个应用程序中的多个 Kafka 流处理器

Binder 允许在一个单个的 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。</p><p>您可以拥有如下的应用程序。spring-doc.cadn.net.cn

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在本例中,绑定器将创建具有不同应用ID(稍后详细介绍)的3个单独的Kafka Streams对象。<br>但是,如果您有多个处理器,则必须告诉Spring Cloud Stream,需要激活哪些函数。这是激活这些函数的方法。spring-doc.cadn.net.cn

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcessspring-doc.cadn.net.cn

如果您希望某些功能在一开始时不立即激活,可以从这个列表中移除它们。spring-doc.cadn.net.cn

这也同样适用于在同一应用程序中有一个单独的Kafka Streams处理程序,并且还有通过不同绑定器(例如基于常规Kafka消息通道绑定器的函数Bean)处理的其他类型为Function的Beanspring-doc.cadn.net.cn

Kafka 流应用ID

应用程序 ID 是 Kafka 流应用程序所需属性之一。 Spring Cloud Stream Kafka 流绑定程序允许您通过多种方式配置此应用程序 ID。spring-doc.cadn.net.cn

If you only have one single processor in the application, 那么你可以通过设置 binder 水平的以下属性来实现:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.applicationId.spring-doc.cadn.net.cn

作为一个方便的选项,如果你只有一台处理器,也可以使用 spring.application.name 作为属性将应用程序 id 委托出去。spring-doc.cadn.net.cn

如果您的应用程序中有多个 Kafka 流处理器,那么需要为每个处理器设置应用程序 ID。在函数式模型中,可以将其附加到每个函数作为属性。spring-doc.cadn.net.cn

例如,假设您有以下功能。spring-doc.cadn.net.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后您可以为每个使用以下绑定器级属性设置应用程序ID。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.functions.process.applicationIdspring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationIdspring-doc.cadn.net.cn

对于基于函数的模型,也可以在此处设置应用程序 ID。如果使用的是函数式模型,则如上文所示在绑定程序级别设置每函数会更简单。spring-doc.cadn.net.cn

在生产部署中,强烈建议通过配置显式指定应用程序ID。 如果您正在自动扩展应用程序,则需要确保每个实例都使用相同的application ID进行部署,这尤其重要。spring-doc.cadn.net.cn

如果应用程序没有提供应用程序ID,则在这种情况下,绑定器将为您自动生成一个静态应用程序ID。 这在开发方案中很方便,因为它无需显式提供应用程序ID。 这样生成的应用程序ID在整个应用程序重新启动期间都是静态的。 在函数模型的情况下,生成的应用程序ID将是函数 bean 名称后跟字面量applicationID,例如process-applicationID如果process如果函数 bean 名称是。spring-doc.cadn.net.cn

spring 概述

  • 默认情况下,binder 将根据功能方法自动为应用程序生成 ID。spring-doc.cadn.net.cn

  • 如果您有单处理器,那么您可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationIdspring-doc.cadn.net.cn

  • 如果要为每个函数设置多个处理器和应用ID,请使用属性-spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationIdspring-doc.cadn.net.cn

重写绑定器生成的默认绑定名,使用函数式风格

默认情况下,当使用函数式风格时,绑定器使用上述策略生成绑定名称,即 function-bean-name-in|-out-[0..n],例如 process-in-0、process-out-0 等。</p><p>如果您想覆盖这些绑定名称,可以这样做,通过指定以下属性。spring-doc.cadn.net.cn

0. 绑定的默认名称是绑定器生成的原始绑定名称。spring-doc.cadn.net.cn

例如,假设你有一个这个函数。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 将生成具有名称的绑定,process-in-0process-in-1process-out-0。 现在,如果你希望将它们完全更改为其他不同的名称,比如更具体的领域绑定名称,你可以按下面的方式进行更改。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-in-0=usersspring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-in-0=regionsspring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-out-0=clicksspring-doc.cadn.net.cn

在那之后,您必须在这些新的绑定名称上设置所有绑定级别属性。spring-doc.cadn.net.cn

请注意,即使使用上述功能性编程模型,大多数情况下遵循默认绑定名称也是有意义的。 如果您有大量配置属性,并且希望将绑定映射到更友好的领域,那么仍然可能需要这样做覆盖。spring-doc.cadn.net.cn

设置启动服务器配置

当运行 Kafka 流应用程序时,您必须提供 Kafka broker 服务器信息。 如果您没有提供这些信息,那么 binder 将假定您正在运行 broker 的默认值为 localhost:9092。 如果情况并非如此,那么您需要进行重写。这里有几种方法可以做到这一点。spring-doc.cadn.net.cn

当涉及到绑定程序级别属性时,无论您使用的是通过常规 Kafka 绑定程序提供的代理属性 spring.cloud.stream.kafka.binder.brokers。Kafka 流绑定程序将首先检查是否设置了 Kafka 流绑定程序特定代理属性 spring.cloud.stream.kafka.streams.binder.brokers,如果未找到,则查找 spring.cloud.stream.kafka.binder.brokersspring-doc.cadn.net.cn