|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
分区
Spring Cloud Stream 提供了对多个给定应用程序实例之间数据分区的支持。 在分区场景中,物理通信介质(如消息代理主题)被视作由多个分区组成。 一个或多个生产者应用程序实例向多个消费者应用程序实例发送数据,并确保具有相同特征的数据由同一消费者实例处理。
Spring Cloud Stream 提供了一个通用抽象,用于以统一的方式实现分区处理用例。 因此,无论代理本身是否自然地进行分区(例如,Kafka),还是没有进行分区(例如,RabbitMQ),都可以使用分区。
分区是无状态处理中的关键概念,从性能或一致性角度来说,确保相关数据一起处理至关重要。 例如,在基于时间窗口的平均值计算示例中,重要的是确保同一应用程序实例处理来自任何给定传感器的所有测量值。
| 要设置分区处理场景,您必须同时配置数据生成端和数据消费端。 |
分区在Spring Cloud Stream中包括两个任务:
配置用于分区的输出绑定
你可以通过设置输出绑定的 partitionKeyExpression 或者 partitionKeyExtractorName 属性之一以及其 partitionCount 属性来配置分区数据发送。
例如,下面是有效且典型的配置示例:
spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
根据此示例配置,使用以下逻辑将数据发送到目标分区。
一个分区键值是基于0生成的,它是针对发送到已分区输出绑定的消息进行计算的。1是一个SpEL表达式,它在传出消息(在前面的示例中,它是从消息头3提取出来的)上被评估,用于提取分片密钥。
如果您 SpEL 表达式无法满足您的需求,您也可以通过提供 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的实现并将其配置为 bean(使用 @Bean 注释)来计算分区键值。 如果在应用程序上下文中存在多个类型为 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的 bean,则可以使用 partitionKeyExtractorName 属性指定其名称进行进一步筛选,如以下示例所示:
--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
return new CustomPartitionKeyExtractorClass();
}
在 Spring Cloud Stream 的早期版本中,可以通过设置 spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass 属性来指定 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的实现。
自 3.0 版本起,该属性已移除。 |
一旦确定了消息密钥,分区选择过程就会确定目标分区的值,在0和partitionCount - 1之间。
默认计算方法适用于大多数情况,基于以下公式: key.hashCode() % partitionCount。可以在绑定上自定义它,要么通过设置一个SpEL表达式在键(通过partitionSelectorExpression属性)上进行评估,要么通过使用@Bean注释配置org.springframework.cloud.stream.binder.PartitionSelectorStrategy的一个实现作为bean。
与PartitionKeyExtractorStrategy类似,当在这种类型的bean在应用程序上下文中可用多个时,可以进一步使用spring.cloud.stream.bindings.output.producer.partitionSelectorName属性对其进行过滤,如以下示例所示:
--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
return new CustomPartitionSelectorClass();
}
在以前版本的Spring Cloud Stream中,可以通过设置org.springframework.cloud.stream.binder.PartitionSelectorStrategy属性来指定spring.cloud.stream.bindings.output.producer.partitionSelectorClass的实现。
自3.0版以来,此属性已删除。 |
Spring 框架配置输入绑定以进行分片处理
一个输入绑定(名为uppercase-in-0)通过设置其partitioned属性,以及在应用程序上设置instanceIndex和instanceCount属性来配置为接收分区化数据,如下面的例子所示:
spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true spring.cloud.stream.instanceIndex=3 spring.cloud.stream.instanceCount=5
instanceCount 值表示应该在其中分割数据的应用程序实例的总数。要分区。instanceIndex 必须是多个实例之间的唯一值,其值介于0和instanceCount - 1之间。The instance index helps each application instance to identify the unique partition(s) from which it receives data.它由绑定器使用,这些绑定器使用不直接支持分区技术的技术。例如,对于RabbitMQ,每个分片都有一个队列,队列名包含实例索引。与Kafka不同,如果autoRebalanceEnabled是true(默认值),则Kafka负责跨实例分发分区,这些属性是不需要的。如果将autoRebalanceEnabled设置为false,绑定器将使用instanceCount和instanceIndex来确定实例订阅哪个分区(每个分区都必须至少有一个实例)。绑定器分配分区,而不是Kafka。这是有帮助的,如果你想让一个特定分区的消息总是在同一个实例中进行处理。请记住,当绑定程序配置需要它们时,必须正确设置这两个值,以确保所有数据都被消耗,并且应用程序实例接收互斥的数据集。
当在一个独立的案例中使用多个实例处理分区数据时,设置过程可能会很复杂,但Spring Cloud Dataflow可以通过正确地填充输入和输出值,并依靠运行时基础架构提供关于实例索引和实例计数的信息,显著简化该过程。