这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 spring-doc.cadn.net.cn

使用 RabbitMQ 绑定程序进行分区

RabbitMQ 不原生支持分区。spring-doc.cadn.net.cn

有时,向特定分区发送数据是有利的——例如,当您希望严格按顺序处理消息时,某个客户的全部消息应发送到同一个分区。spring-doc.cadn.net.cn

RabbitMessageChannelBinder 通过为每个分区绑定一个队列到目标交换器来提供分区。spring-doc.cadn.net.cn

以下 Java 和 YAML 示例显示了如何配置生产者:spring-doc.cadn.net.cn

生产者
@SpringBootApplication
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            generate-out-0:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前面示例中的配置使用了默认分区(key.hashCode() % partitionCount)。根据键值,这可能提供一个适当平衡的算法,也可能不提供。
您可以通过使用 partitionSelectorExpressionpartitionSelectorClass 属性来覆盖此默认设置。spring-doc.cadn.net.cn

required-groups属性仅在需要在生产者部署时为消费者队列配置时才需要。 否则,发送到分区的任何消息在相应的消费者部署之前都会丢失。spring-doc.cadn.net.cn

以下配置设置了一个主题交换:spring-doc.cadn.net.cn

part exchange

以下队列结维到该 Exchange:spring-doc.cadn.net.cn

part queues

以下绑定将队列与交换机关联:spring-doc.cadn.net.cn

part bindings

下面的 Java 和 YAML 示例继续了前面的例子,展示了如何配置消费者:spring-doc.cadn.net.cn

消费者
@SpringBootApplication
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
            System.out.println(in + " received from queue " + queue);
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            listen-in-0:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

0不支持动态缩放。spring-doc.cadn.net.cn

每个分区必须至少有一个消费者。spring-doc.cadn.net.cn

消费者的1用于指示哪个分区被消费。spring-doc.cadn.net.cn

如Cloud Foundry等平台只能有一个实例带有2spring-doc.cadn.net.cn