分区

Spring Cloud Stream 支持在给定应用程序的多个实例之间对数据进行分区。 在分区场景中,物理通信介质(例如代理主题)被视为结构化为多个分区。 一个或多个生产者应用实例将数据发送到多个消费者应用实例,并确保由共同特征标识的数据由同一消费者实例处理。spring-doc.cadn.net.cn

Spring Cloud Stream 提供了一个通用的抽象,用于以统一的方式实现分区处理用例。 因此,无论代理本身是自然分区的(例如,Kafka)还是不分区(例如,RabbitMQ),都可以使用分区。spring-doc.cadn.net.cn

SCSt 分区
图 1.Spring Cloud 流分区

分区是有状态处理中的一个关键概念,确保所有相关数据一起处理至关重要(出于性能或一致性原因)。 例如,在时间窗口平均计算示例中,重要的是来自任何给定传感器的所有测量值都由同一应用程序实例处理。spring-doc.cadn.net.cn

要设置分区处理方案,必须同时配置数据生成端和数据消费端。

Spring Cloud Stream 中的分区包括两个任务:spring-doc.cadn.net.cn

配置用于分区的输出绑定

您可以通过设置其中一个且仅设置一个输出绑定来配置输出绑定以发送分区数据partitionKeyExpressionpartitionKeyExtractorName属性,以及其partitionCount财产。spring-doc.cadn.net.cn

例如,下面是有效的典型配置:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5

根据该示例配置,使用以下逻辑将数据发送到目标分区。spring-doc.cadn.net.cn

分区键的值是根据partitionKeyExpression. 这partitionKeyExpression是根据出站消息计算的 SpEL 表达式(在前面的示例中,它是id从消息头中提取分区键。spring-doc.cadn.net.cn

如果 SpEL 表达式不足以满足您的需求,您可以通过提供org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy并将其配置为 bean(通过使用@Bean注释)。 如果您有多个类型的 beanorg.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy可用,您可以通过使用partitionKeyExtractorName属性,如以下示例所示:spring-doc.cadn.net.cn

--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}
在以前版本的 Spring Cloud Stream 中,您可以指定org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy通过将spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass财产。 从 3.0 版开始,此属性被删除。

计算消息键后,分区选择过程将目标分区确定为0partitionCount - 1. 默认计算适用于大多数方案,基于以下公式:key.hashCode() % partitionCount. 这可以在绑定上自定义,通过设置要针对“键”计算的 SpEL 表达式(通过partitionSelectorExpression属性)或通过配置org.springframework.cloud.stream.binder.PartitionSelectorStrategy作为 bean(通过使用 @Bean 注释)。 类似于PartitionKeyExtractorStrategy,您可以使用spring.cloud.stream.bindings.output.producer.partitionSelectorName当应用程序上下文中有多个此类型的 bean 可用时,属性,如以下示例所示:spring-doc.cadn.net.cn

--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}
在以前版本的 Spring Cloud Stream 中,您可以指定org.springframework.cloud.stream.binder.PartitionSelectorStrategy通过将spring.cloud.stream.bindings.output.producer.partitionSelectorClass财产。 从 3.0 版开始,此属性被删除。

配置用于分区的输入绑定

输入绑定(绑定名称uppercase-in-0) 配置为通过设置其partitioned属性,以及instanceIndexinstanceCount属性,如以下示例所示:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCountvalue 表示应在其中分区数据的应用程序实例总数。 这instanceIndex必须是多个实例中的唯一值,值介于0instanceCount - 1. 实例索引可帮助每个应用程序实例识别其从中接收数据的唯一分区。 使用不支持本机分区的技术的活页夹需要它。 例如,对于 RabbitMQ,每个分区都有一个队列,队列名称包含实例索引。 使用 Kafka,如果autoRebalanceEnabledtrue(默认),Kafka 负责跨实例分发分区,并且不需要这些属性。 如果autoRebalanceEnabled设置为 false,则instanceCountinstanceIndex被绑定器用来确定实例订阅的分区(必须至少与实例数量一样多的分区)。 绑定器分配分区而不是 Kafka。 如果您希望特定分区的消息始终转到同一实例,这可能很有用。 当 Binder 配置需要它们时,必须正确设置这两个值,以确保使用所有数据并确保应用程序实例接收互斥的数据集。spring-doc.cadn.net.cn

虽然在独立情况下使用多个实例进行分区数据处理的场景设置起来可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值并让您依赖运行时基础设施来提供有关实例索引和实例计数的信息来显着简化该过程。spring-doc.cadn.net.cn

测试

Spring Cloud Stream 支持在不连接到消息传递系统的情况下测试您的微服务应用程序。spring-doc.cadn.net.cn