|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
RabbitMQ 流插件的初始消费者支持
基本支持 RabbitMQ 流插件 现已提供。
要启用此功能,必须向类路径添加 spring-rabbit-stream jar 文件 - 它必须与 spring-amqp 和 spring-rabbit 版本相同。
上面描述的消费者属性在设置containerType属性为stream时不支持;仅支持超流上的concurrency。
每个绑定只能由一个流队列消费。 |
要让绑定程序使用containerType=stream,Spring Boot会自动从应用程序属性中配置一个Environment@Bean。您还可以可选地添加一个定制器来自定义侦听器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
传递给自定义器的 name 参数是 destination + '.' + group + '.container'。
流name()(用于偏移量跟踪)被设置为绑定destination + '.' + group。可以使用上图中的ConsumerCustomizer进行更改。如果您决定使用手动偏移量跟踪,则可以通过消息头获取Context:
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有关配置环境和消费者构建器的信息,请参阅RabbitMQ Stream Java客户端文档。
消费者支持 RabbitMQ 超级流
见 Super Streams 以了解有关超级流的信息。
使用超级流允许在超级流的每个分区上仅有一个活动消费者时自动扩展和缩减。
配置示例:
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
框架将创建一个名为 super 的超级流,包含 9 个分区。
最多可以部署此应用程序的 3 个实例。