对于最新稳定版本,请使用spring-cloud-stream 5.0.1spring-doc.cadn.net.cn

初始对 RabbitMQ 流插件的生产者支持

基本支持 RabbitMQ 流插件 现已提供。 要启用此功能,必须向类路径添加 spring-rabbit-stream jar 文件 - 它必须与 spring-amqpspring-rabbit 版本相同。spring-doc.cadn.net.cn

当上述生产者属性设置为 STREAM_SYNCSTREAM_ASYNC 时,描述的生产者属性不被支持。

要配置绑定器以使用流 ProducerType,Spring Boot 将从应用程序属性中配置一个 Environment @Bean。您可以选择添加自定义程序来定制消息处理器。spring-doc.cadn.net.cn

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and producer builder.spring-doc.cadn.net.cn

用于RabbitMQ超级流的生产者支持

Super Streams 以了解有关超级流的信息。spring-doc.cadn.net.cn

使用超流可以单个活动使用者与超级流的每个分区一起自动扩展,具有单一的主动使用者。使用Spring Cloud Stream,您可以使用AMQP或使用流客户端向超级流发布。spring-doc.cadn.net.cn

Super 流必须已经存在;不支持通过生产者绑定创建 Super 流。

发布到 AMQP 超流:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客户端向超级流发布:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客户端时,如果将 confirmAckChannel 设置为通道,则会将成功发送的消息副本发送到该通道。spring-doc.cadn.net.cn