此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
RabbitMQ Stream 插件的初始生产者支持
现在提供了对 RabbitMQ Stream 插件的基本支持。要启用此功能,您必须将spring-rabbit-stream
jar 到类路径 - 它必须与spring-amqp
和spring-rabbit
.
当您将producerType 属性设置为STREAM_SYNC 或STREAM_ASYNC . |
将绑定器配置为使用流ProducerType
,Spring Boot 将配置一个Environment
@Bean
从应用属性。
可以选择添加定制器来自定义消息处理程序。
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
有关配置环境和生产者构建器的信息,请参阅 RabbitMQ Stream Java 客户端文档。
生产者对 RabbitMQ 超级流的支持
有关超级流的信息,请参阅超级流。
使用超级流允许在超级流的每个分区上使用单个活动使用者进行自动纵向扩展、纵向缩减。 使用 Spring Cloud Stream,您可以通过 AMQP 或使用流客户端发布到超级流。
超级流必须已经存在;生产者绑定不支持创建超级流。 |
通过 AMQP 发布到超级流:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端发布到超级流:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端时,如果将confirmAckChannel
,则成功发送的消息的副本将发送到该通道。