|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
出站的分区支持
一个Kafka Streams处理器通常将处理后的输出发送到出站Kafka主题。
如果出站主题被分区,并且处理器需要将传出数据发送到特定分区,则应用程序需要提供类型为StreamPartitioner的bean。
有关更多详细信息,请参阅StreamPartitioner。
让我们看一些示例。
这是我们已经多次看到的同一处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
这是输出绑定目标:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题 outputTopic 有4个分区,并且您没有提供一个分区策略,Kafka Streams 将使用默认的分区策略,这可能不是根据特定用例想要的结果。假设您希望将任何匹配到 spring 的键发送到分区 0,cloud 到分区 1,stream 到分区 2,而其他所有内容则发送到分区 3。这就是您需要在应用程序中执行的操作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是您可以访问记录的关键字/值、主题名称以及分区总数。因此,如有需要,可以实现复杂的分区策略。
您还需要提供此 Bean 名称以及应用程序配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。