对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
具有基于 Kafka Streams 的绑定器和常规 Kafka 绑定器的多绑定器
您可以拥有一个应用程序,其中既有基于常规 Kafka 绑定器的功能/使用者/提供商,又有基于 Kafka Streams 的处理器。 但是,您不能在单个函数或消费者中混合使用它们。
下面是一个示例,在同一应用程序中具有两个基于 Binder 的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是配置中的相关部分:
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您拥有与上述相同的应用程序,但正在处理两个不同的 Kafka 集群,例如常规process
同时作用于 Kafka 集群 1 和集群 2(从 cluster-1 接收数据并发送到 cluster-2),并且 Kafka Streams 处理器作用于 Kafka 集群 2。
然后你必须使用 Spring Cloud Stream 提供的多绑定器工具。
下面是配置在这种情况下可能发生的变化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意以上配置。
我们有两种绑定器,但总共有 3 个绑定器,第一种是基于集群 1 (kafka1
),然后是另一个基于集群 2 (kafka2
),最后是kstream
一个 (kafka3
).
应用程序中的第一个处理器从kafka1
并发布到kafka2
其中两个绑定器都基于常规的 Kafka 绑定器,但集群不同。
第二个处理器是 Kafka Streams 处理器,它使用kafka3
与kafka2
,但粘合剂类型不同。
由于 Kafka Streams 系列绑定器中有三种不同的绑定器类型可用 -kstream
,ktable
和globalktable
- 如果您的应用程序具有基于这些绑定中的任何一个的多个绑定,则需要将其显式作为绑定器类型提供。
例如,如果您有如下处理器,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
然后,必须在多活页夹方案中按如下方式配置。 请注意,仅当您有一个真正的多绑定器方案时,才需要这样做,其中有多个处理器在单个应用程序中处理多个集群。 在这种情况下,需要显式为绑定器提供绑定,以区别于其他处理器的绑定器类型和集群。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.