带有Kafka Streams绑定器和常规Kafka绑定器的多个绑定程序
你可以在一个应用程序中同时拥有基于常规Kafka绑定器的功能/消费者/提供商和基于Kafka Streams的处理器。 然而,你不能在同一功能或消费者中混合使用这两种方式。
此处是一个示例,其中在同一应用程序中同时存在基于绑定的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是配置的相关部分。
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您的应用与上面的示例相似,但同时处理两个不同的Kafka集群,则事情会变得更加复杂,例如,process同时连接到Kafka集群1和集群2(从集群-1接收数据并发送到集群-2),而Kafka流处理器仅连接到Kafka集群2。
这时,您需要使用Spring Cloud Stream提供的多绑定器功能。
这是您在该场景中可能需要更改的配置方式。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意以上配置。我们有两种类型的绑定器,但实际上有 3 种绑定器,第一个是基于群集 1(kafka1)的常规 Kafka 绑定器,然后是基于群集 2(kafka2)的另一个 Kafka 绑定器,以及最后一个(kstream)(kafka3)。 第一个处理器从 kafka1 接收数据并发布到 kafka2,其中这两个绑定器都基于常规 Kafka 绑定器但不同的群集。 第二个处理器是一个 Kafka 流处理器,它从 kafka3 消费数据,该组与 kafka2 同一集群,但不同类型的绑定器。
因为Kafka Streams绑定器家族中有三种不同的绑定器类型可用——kstream、ktable和globalktable——如果您的应用基于这些绑定器中的任何一种有多个绑定,那么需要显式地提供作为绑定器类型。
对于例如,如果你有一个处理器如下,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
然后,在多绑定程序的情况下,必须按如下所示进行配置。 请注意,只有在有真正的多绑定程序场景中,即在一个应用程序中有多个处理器处理多个群集时,才需要这样做。 在这种情况下,需要显式地向绑定器提供绑定,以区分其他处理器绑定器类型和群集。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.