初始对 RabbitMQ 流插件的生产者支持
基本支持 RabbitMQ 流插件 现已提供。
要启用此功能,必须向类路径添加 spring-rabbit-stream jar 文件 - 它必须与 spring-amqp 和 spring-rabbit 版本相同。
当上述生产者属性设置为 STREAM_SYNC 或 STREAM_ASYNC 时,描述的生产者属性不被支持。 |
要将绑定器配置为使用流 ProducerType,Spring Boot 将会从应用属性中配置一个 Environment @Bean。
可选地,你可以添加一个自定义器以自定义消息处理程序。
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and producer builder.
用于RabbitMQ超级流的生产者支持
见 Super Streams 以了解有关超级流的信息。
使用超流可以单个活动使用者与超级流的每个分区一起自动扩展,具有单一的主动使用者。使用Spring Cloud Stream,您可以使用AMQP或使用流客户端向超级流发布。
| Super 流必须已经存在;不支持通过生产者绑定创建 Super 流。 |
发布到 AMQP 超流:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端向超级流发布:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端时,如果将 confirmAckChannel 设置为通道,则会将成功发送的消息副本发送到该通道。