|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
使用 RabbitMQ 绑定程序进行分区
RabbitMQ 不原生支持分区。
有时,向特定分区发送数据是有利的——例如,当您希望严格按顺序处理消息时,某个客户的全部消息应发送到同一个分区。
该 RabbitMessageChannelBinder 通过为每个分区绑定一个队列到目标交换器来提供分区。
以下 Java 和 YAML 示例显示了如何配置生产者:
生产者
@SpringBootApplication
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
|
前面示例中的配置使用了默认分区(
|
以下配置设置了一个主题交换:
以下队列结维到该 Exchange:
以下绑定将队列与交换机关联:
下面的 Java 和 YAML 示例继续了前面的例子,展示了如何配置消费者:
消费者
@SpringBootApplication
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
0不支持动态缩放。 每个分区必须至少有一个消费者。 消费者的1用于指示哪个分区被消费。 如Cloud Foundry等平台只能有一个实例带有2。 |