Rabbit MQ Binder
参考指南
本指南介绍了 Spring Cloud Stream Binder 的 RabbitMQ 实现。 它包含有关其设计、用法和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 RabbitMQ 特定构造的信息。
1. 用途
要使用 RabbitMQ 绑定器,您可以使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2. RabbitMQ Binder 概述
以下简化图显示了 RabbitMQ 绑定程序的运行方式:

默认情况下,RabbitMQ Binder 实现将每个目标映射到TopicExchange
.
对于每个消费者组,一个Queue
与此绑定TopicExchange
.
每个消费者实例都有对应的 RabbitMQConsumer
实例的Queue
.
对于分区的生产者和消费者,队列以分区索引为后缀,并使用分区索引作为路由键。
对于匿名消费者(那些没有group
属性),则使用自动删除队列(具有随机的唯一名称)。
通过使用可选的autoBindDlq
选项,您可以配置活页夹以创建和配置死信队列 (DLQ)(以及死信交换DLX
,以及路由基础设施)。
默认情况下,死信队列具有目标名称,并附加.dlq
.
如果启用重试 (maxAttempts > 1
),在重试用尽后,失败的消息将传递到 DLQ。
如果禁用重试 (maxAttempts = 1
),您应该将requeueRejected
自false
(缺省值),以便将失败的消息路由到 DLQ,而不是重新排队。
另外republishToDlq
导致 Binder 将失败的消息发布到 DLQ(而不是拒绝它)。
此功能允许其他信息(例如x-exception-stacktrace
header)添加到 headers 中的消息中。
请参阅frameMaxHeadroom
属性有关截断堆栈跟踪的信息。
此选项不需要启用重试。
只需尝试一次即可重新发布失败的邮件。
从 1.2 版本开始,您可以配置重新发布的消息的传递方式。
请参阅republishDeliveryMode
属性.
如果流侦听器抛出ImmediateAcknowledgeAmqpException
,则绕过 DLQ 并直接丢弃消息。
从 2.1 版开始,无论republishToDlq
;以前只有在以下情况下才会出现这种情况republishToDlq
是false
.
设置requeueRejected 自true (与republishToDlq=false ) 会导致消息不断重新排队和重新传递,这可能不是您想要的,除非失败原因是暂时的。
通常,应通过设置maxAttempts 设置为大于 1 或通过设置republishToDlq 自true . |
从 3.1.2 版开始,如果使用者标记为transacted
,发布到 DLQ 将参与交易。
这允许在发布由于某种原因失败时回滚事务(例如,如果用户无权发布到死信交换)。
此外,如果连接工厂配置为发布者确认或返回,则 DLQ 的发布将等待确认并检查返回的消息。
如果收到否定确认或返回的消息,绑定程序将抛出AmqpRejectAndDontRequeueException
,允许代理负责发布到 DLQ,就好像republishToDlq
属性是false
.
有关这些属性的更多信息,请参阅 RabbitMQ Binder 属性。
该框架不提供任何标准机制来使用死信消息(或将它们重新路由回主队列)。 死信队列处理中介绍了一些选项。
当在 Spring Cloud Stream 应用程序中使用多个 RabbitMQ 绑定器时,请务必禁用“RabbitAutoConfiguration”以避免从RabbitAutoConfiguration 应用于两个粘合剂。
您可以使用@SpringBootApplication 注解。 |
从 2.0 版开始,RabbitMessageChannelBinder
将RabbitTemplate.userPublisherConnection
属性设置为true
这样,非事务生产者就可以避免消费者的死锁,如果缓存的连接由于代理上的内存警报而被阻止,则可能会发生这种情况。
目前,一个multiplex Consumer(监听多个队列的单个消费者)仅支持消息驱动的消费者;轮询的使用者只能从单个队列中检索消息。 |
3. 配置选项
本节包含特定于 RabbitMQ Binder 和绑定通道的设置。
有关常规绑定配置选项和属性,请参阅 Spring Cloud Stream 核心文档。
3.1. RabbitMQ Binder 属性
默认情况下,RabbitMQ 绑定器使用 Spring Boot 的ConnectionFactory
.
因此,它支持 RabbitMQ 的所有 Spring Boot 配置选项。
(有关参考,请参阅 Spring Boot 文档)。
RabbitMQ 配置选项使用spring.rabbitmq
前缀。
除了 Spring Boot 选项之外,RabbitMQ 绑定器还支持以下属性:
- spring.cloud.stream.rabbit.binder.admin地址
-
RabbitMQ 管理插件 URL 的逗号分隔列表。 仅在以下情况下使用
nodes
包含多个条目。 此列表中的每个条目都必须在spring.rabbitmq.addresses
. 仅当您使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参阅队列关联性和 LocalizedQueueConnectionFactory。默认值:空。
- spring.cloud.stream.rabbit.binder.nodes
-
以逗号分隔的 RabbitMQ 节点名称列表。 当有多个条目时,用于查找队列所在的服务器地址。 此列表中的每个条目都必须在
spring.rabbitmq.addresses
. 仅当您使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参阅队列关联性和 LocalizedQueueConnectionFactory。默认值:空。
- spring.cloud.stream.rabbit.binder.compressionLevel
-
压缩绑定的压缩级别。 看
java.util.zip.Deflater
.违约:
1
(BEST_LEVEL)。 - spring.cloud.stream.binder.connection-name-prefix
-
用于命名此活页夹创建的连接的连接名称前缀。名称是此前缀,后跟
#n
哪里n
每次打开新连接时递增。默认值:none(Spring AMQP 默认值)。
3.2. RabbitMQ 消费者属性
以下属性仅适用于 Rabbit 使用者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.
.
但是,如果需要将同一组属性应用于大多数绑定,请将
避免重复,Spring Cloud Stream 支持为所有通道设置值,
以spring.cloud.stream.rabbit.default.<property>=<value>
.
另外,请记住,绑定特定属性将覆盖其默认中的等效属性。
- 确认模式
-
确认模式。
违约:
AUTO
. - 匿名组前缀
-
当绑定没有
group
属性,则匿名的自动删除队列绑定到目标交换。 此类队列的默认命名策略会导致名为anonymous.<base64 representation of a UUID>
. 设置此属性可将前缀更改为默认值以外的内容。违约:
anonymous.
. - 自动绑定Dlq
-
是否自动声明 DLQ 并将其绑定到绑定程序 DLX。
违约:
false
. - bindingRoutingKey
-
用于将队列绑定到交换的路由密钥(如果
bindQueue
是true
). 可以是多个键 - 请参阅bindingRoutingKeyDelimiter
. 对于分区目标,-<instanceIndex>
附加到每个键。违约:。
#
- bindingRoutingKeyDelimiter
-
当它不为空时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。
违约:
null
. - 绑定队列
-
是否声明队列并将其绑定到目标交换。 将其设置为
false
如果您已经设置了自己的基础设施,并且之前已经创建并绑定了队列。违约:
true
. - consumerTagPrefix
-
用于创建消费者标签;将附加
#n
哪里n
为创建的每个消费者增量。 例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}
.默认值:无 - 代理将生成随机消费者标签。
- 容器类型
-
选择要使用的侦听器容器的类型。 有关更多信息,请参阅 Spring AMQP 文档中的选择容器。 另请参阅 [rabbitmq-stream]。
违约:
simple
- deadLetter队列名称
-
DLQ 的名称
违约:
prefix+destination.dlq
- 死信交换
-
要分配给队列的 DLX。 仅当
autoBindDlq
是true
.默认值:“前缀+DLX”
- deadLetter交换类型
-
要分配给队列的 DLX 类型。 仅当
autoBindDlq
是true
.默认值:“直接”
- deadLetterRouting键
-
要分配给队列的死信路由键。 仅当
autoBindDlq
是true
.违约:
destination
- 声明Dlx
-
是否为目的地申报死信交换。 仅当
autoBindDlq
是true
. 设置为false
如果您有预配置的 DLX。违约:
true
. - 声明交换
-
是否声明目标的交换。
违约:
true
. - 延迟交换
-
是否将交易所声明为
Delayed Message Exchange
. 需要代理上的延迟消息交换插件。 这x-delayed-type
参数设置为exchangeType
.违约:
false
. - dlqBinding参数
-
将 dlq 绑定到死信交换时应用的参数;与
headers
deadLetterExchangeType
以指定要匹配的标头。 例如…dlqBindingArguments.x-match=any
,…dlqBindingArguments.someHeader=someValue
.默认值:空
- dlq死信交换
-
如果声明了 DLQ,则要分配给该队列的 DLX。
违约:
none
- dlqDeadLetterRoutingKey
-
如果声明了 DLQ,则要分配给该队列的死信路由密钥。
违约:
none
- dlq过期
-
删除未使用的死信队列之前多长时间(以毫秒为单位)。
违约:
no expiration
- dlq懒惰
-
使用
x-queue-mode=lazy
论点。 请参阅“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。违约:
false
. - dlq最大长度
-
死信队列中的最大消息数。
违约:
no limit
- dlqMaxLength字节
-
所有消息的死信队列中的最大总字节数。
违约:
no limit
- dlqMax优先级
-
死信队列中邮件的最大优先级 (0-255)。
违约:
none
- dlqOverflow行为
-
在以下情况下要采取的作
dlqMaxLength
或dlqMaxLengthBytes
已超过;现在drop-head
或reject-publish
但请参阅 RabbitMQ 文档。违约:
none
- dlqQuorum.deliveryLimit
-
什么时候
quorum.enabled=true
,设置传递限制,在此之后,邮件将被丢弃或死信。默认值:无 - 代理默认值将适用。
- dlqQuorum.enabled
-
如果为 true,请创建仲裁死信队列,而不是经典队列。
默认值:false
- dlqQuorum.initialGroupSize
-
什么时候
quorum.enabled=true
,设置初始仲裁大小。默认值:无 - 代理默认值将适用。
- dlqSingleActiveConsumer
-
设置为 true 以设置
x-single-active-consumer
queue 属性设置为 true。违约:
false
- dlqTtl
-
声明时应用于死信队列的默认生存时间(以毫秒为单位)。
违约:
no limit
- 持久订阅
-
订阅是否应持久。 仅当
group
也设置了。违约:
true
. - 交易所自动删除
-
如果
declareExchange
为 true,则是否应自动删除交换(即在删除最后一个队列后删除)。违约:
true
. - 交换耐用
-
如果
declareExchange
是否为 true,则交换是否应该是持久的(即,它在代理重启后仍然存在)。违约:
true
. - 交换类型
-
交易所类型:
direct
,fanout
,headers
或topic
对于未分区的目标和direct
、标题或topic
用于分区目标。违约:
topic
. - 独家
-
是否创建专属消费者。 当这是
true
. 通常在需要严格排序但允许热备用实例在故障后接管时使用。 看recoveryInterval
,控制备用实例尝试使用的频率。 考虑使用singleActiveConsumer
而是在使用 RabbitMQ 3.8 或更高版本时。违约:
false
. - 到期
-
删除未使用的队列之前多长时间(以毫秒为单位)。
违约:
no expiration
- failedDeclarationRetryInterval (失败声明重试间隔)
-
尝试从队列中消耗(如果缺少)之间的间隔(以毫秒为单位)。
默认值:5000
- 框架最大净空
-
将堆栈跟踪添加到 DLQ 消息标头时要为其他标头保留的字节数。 所有标头都必须适合
frame_max
size 配置在代理上。 堆栈跟踪可能很大;如果大小加上此属性超过frame_max
则堆栈跟踪将被截断。 将写入 WARN 日志;考虑增加frame_max
或者通过捕获异常并抛出具有较小堆栈跟踪的异常来减少堆栈跟踪。默认值:20000
- headerPatterns
-
要从入站消息映射的标头的模式。
默认值:(所有标头)。
['*']
- 懒惰
-
使用
x-queue-mode=lazy
论点。 请参阅“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。违约:
false
. - 最大并发
-
使用者的最大数量。 当
containerType
是direct
.违约:
1
. - 最大长度
-
队列中的最大消息数。
违约:
no limit
- maxLength字节
-
队列中所有消息的最大总字节数。
违约:
no limit
- 最大优先级
-
队列中消息的最大优先级 (0-255)。
违约:
none
- missingQueues致命
-
当找不到队列时,是否将该条件视为致命并停止监听器容器。 默认为
false
以便容器不断尝试从队列中消费——例如,当使用集群并且托管非 HA 队列的节点关闭时。违约:
false
- 溢出行为
-
在以下情况下要采取的作
maxLength
或maxLengthBytes
已超过;现在drop-head
或reject-publish
但请参阅 RabbitMQ 文档。违约:
none
- 预取
-
预取计数。
违约:
1
. - 前缀
-
要添加到
destination
和队列。默认值:“”。
- queueBindingArguments
-
将队列绑定到交换时应用的参数;与
headers
exchangeType
以指定要匹配的标头。 例如…queueBindingArguments.x-match=any
,…queueBindingArguments.someHeader=someValue
.默认值:空
- queueDeclarationRetries
-
如果缺少队列,则重试从队列中使用的次数。 仅在以下情况下相关
missingQueuesFatal
是true
. 否则,容器会无限期地重试。 当containerType
是direct
.违约:
3
- 队列名称仅组
-
当 true 时,从名称等于
group
. 否则队列名称为destination.group
. 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中消费时,这很有用。默认值:false。
- quorum.deliveryLimit
-
什么时候
quorum.enabled=true
,设置传递限制,在此之后,邮件将被丢弃或死信。默认值:无 - 代理默认值将适用。
- 法定人数已启用
-
如果为 true,请创建仲裁队列而不是经典队列。
默认值:false
- quorum.initialGroup大小
-
什么时候
quorum.enabled=true
,设置初始仲裁大小。默认值:无 - 代理默认值将适用。
- 恢复间隔
-
连接恢复尝试之间的间隔(以毫秒为单位)。
违约:
5000
. - requeue已拒绝
-
禁用重试时是否应将传递失败重新排队,或者
republishToDlq
是false
.违约:
false
.
- 重新发布交付模式
-
什么时候
republishToDlq
是true
,指定重新发布的邮件的传递方式。违约:
DeliveryMode.PERSISTENT
- 重新发布到Dlq
-
默认情况下,重试用尽后失败的邮件将被拒绝。 如果配置了死信队列 (DLQ),则 RabbitMQ 会将失败的消息(原封不动)路由到 DLQ。 如果设置为
true
,则 Binder 会使用其他标头将失败的消息重新发布到 DLQ,包括异常消息和最终失败原因的堆栈跟踪。 另请参阅 frameMaxHeadroom 属性。违约:
true
- 单主动消费者
-
设置为 true 以设置
x-single-active-consumer
queue 属性设置为 true。违约:
false
- 交易
-
是否使用交易通道。
违约:
false
. - TTL的
-
声明时应用于队列的默认生存时间(以毫秒为单位)。
违约:
no limit
- tx大小
-
确认之间的交付次数。 当
containerType
是direct
.违约:
1
.
3.3. RabbitMQ Stream插件的初始消费者支持
现在提供了对 RabbitMQ Stream 插件的基本支持。
要启用此功能,您必须将spring-rabbit-stream
jar 到类路径 - 它必须与spring-amqp
和spring-rabbit
.
当您将containerType 属性设置为stream ; concurrency 仅支持超级流。
每个绑定只能使用一个流队列。 |
配置要使用的活页夹containerType=stream
,Spring Boot 将自动配置一个Environment
@Bean
从应用程序属性。
可以选择添加定制器来自定义侦听器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
这name
传递给定制器的参数是destination + '.' + group + '.container'
.
流name()
(用于偏移跟踪)设置为绑定destination + '.' + group
.
可以使用ConsumerCustomizer
如上所示。
如果您决定使用手动偏移跟踪,则Context
可用作邮件头:
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有关配置环境和消费者构建器的信息,请参阅 RabbitMQ Stream Java 客户端文档。
3.3.1. 对 RabbitMQ 超级流的消费者支持
有关超级流的信息,请参阅超级流。
使用超级流允许在超级流的每个分区上使用单个活动使用者进行自动纵向扩展、纵向缩减。
配置示例:
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
该框架将创建一个名为super
,有 9 个分区。
最多可以部署此应用程序的 3 个实例。
3.4. 高级侦听器容器配置
要设置未公开为绑定器或绑定属性的侦听器容器属性,请添加类型为ListenerContainerCustomizer
到应用程序上下文。
将设置活页夹和绑定属性,然后调用定制器。
定制器 (configure()
方法)与队列名称以及消费者组作为参数一起提供。
3.5. 高级队列/交换/绑定配置
RabbitMQ 团队会不时添加新功能,这些功能是通过在声明队列时设置一些参数来启用的。
通常,通过添加适当的属性在活页夹中启用此类功能,但这在当前版本中可能无法立即可用。
从 3.0.1 版开始,您现在可以将DeclarableCustomizer
bean(s) 添加到应用程序上下文中以修改Declarable
(Queue
,Exchange
或Binding
) 在执行声明之前。这允许您添加绑定器当前不直接支持的参数。
3.6. 接收批处理消息
使用 RabbitMQ 绑定器,使用者绑定处理两种类型的批处理:
3.6.1. 生产者创建的批次
通常,如果生产者绑定具有batch-enabled=true
(参见 Rabbit Producer 属性),或者消息是由BatchingRabbitTemplate
,批处理的元素将作为对侦听器方法的单独调用返回。
从 3.0 版开始,任何此类批次都可以显示为List<?>
如果spring.cloud.stream.bindings.<name>.consumer.batch-mode
设置为true
.
3.6.2. 消费者端批处理
从版本 3.1 开始,可以将使用者配置为将多个入站消息组合成一个批次,该批处理作为List<?>
转换后的有效负载。以下简单应用程序演示了如何使用此技术:
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<List<Thing>> input() {
return list -> {
System.out.println("Received " + list.size());
list.forEach(thing -> {
System.out.println(thing);
// ...
});
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1]
Thing [field=value2]
批处理中的消息数由batch-size
和receive-timeout
性能; 如果receive-timeout
在没有新消息的情况下,将传递一个“短”批次。
仅支持container-type=simple (默认值)。 |
如果要检查使用者端批处理邮件的标头,则应使用Message<List<?>>
; 标头是List<Map<String, Object>>
在标题中AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS
,以及相应索引中每个有效负载元素的标头。同样,这里有一个简单的例子:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<Message<List<Thing>>> input() {
return msg -> {
List<Thing> things = msg.getPayload();
System.out.println("Received " + things.size());
List<Map<String, Object>> headers =
(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
for (int i = 0; i < things.size(); i++) {
System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));
// ...
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue1");
return msg;
});
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue2");
return msg;
});
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getfield() {
return this.field;
}
public void setfield(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2
3.7. 兔子生产者属性
以下属性仅适用于 Rabbit 生产者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.
.
但是,如果需要将同一组属性应用于大多数绑定,请将
避免重复,Spring Cloud Stream 支持为所有通道设置值,
以spring.cloud.stream.rabbit.default.<property>=<value>
.
另外,请记住,绑定特定属性将覆盖其默认中的等效属性。
- altermateExchange.binding.queue
-
如果交换尚不存在,并且
name
,则将此队列绑定到备用交换。将供应一个没有参数的简单持久队列;如果需要更复杂的配置,则必须自行配置和绑定队列。违约:
null
alternateExchange.binding.routing键如果交换尚不存在,并且name
和queue
,则使用此路由密钥将队列绑定到备用交换。默认值:(对于默认值
#
topic
替代交换) - alternateExchange.exists
-
备用交换是否存在,或者是否需要预配。
违约:
false
- alternateExchange.类型
-
如果备用交换尚不存在,则要预配的交换类型。
违约:
topic
- alternateExchange.name
-
在目标交换机上配置备用交换机。
违约:
null
- 自动绑定Dlq
-
是否自动声明 DLQ 并将其绑定到绑定程序 DLX。
违约:
false
. - 批处理已启用
-
是否由生产者启用消息批处理。 消息根据以下属性批处理为一条消息(在此列表中的接下来的三个条目中描述): 'batchSize',
batchBufferLimit
和batchTimeout
. 有关详细信息,请参阅批处理。 另请参阅接收批处理消息。违约:
false
. - batch大小
-
启用批处理时要缓冲的消息数。
违约:
100
. - batch缓冲限制
-
启用批处理时的最大缓冲区大小。
违约:
10000
. - batchTimeout
-
启用批处理时的批处理超时。
违约:
5000
. - bindingRoutingKey
-
用于将队列绑定到交换的路由密钥(如果
bindQueue
是true
). 可以是多个键 - 请参阅bindingRoutingKeyDelimiter
. 对于分区目标,-n
附加到每个键。仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。违约:。
#
- bindingRoutingKeyDelimiter
-
当它不为空时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
null
. - 绑定队列
-
是否声明队列并将其绑定到目标交换。 将其设置为
false
如果您已经设置了自己的基础设施,并且之前已经创建并绑定了队列。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。违约:
true
. - 压缩
-
发送时是否应压缩数据。
违约:
false
. - 确认AckChannel
-
什么时候
errorChannelEnabled
为 true,则是向其发送正面投放确认(又称发布者确认)的通道。 如果通道不存在,则DirectChannel
以此名称注册。 必须将连接工厂配置为启用发布者确认。 相互排斥useConfirmHeader
.违约:
nullChannel
(acks 被丢弃)。 - deadLetter队列名称
-
DLQ 的名称 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
prefix+destination.dlq
- 死信交换
-
要分配给队列的 DLX。 仅在以下情况下相关
autoBindDlq
是true
. 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:“前缀+DLX”
- deadLetter交换类型
-
要分配给队列的 DLX 类型。 仅当
autoBindDlq
是true
. 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:“直接”
- deadLetterRouting键
-
要分配给队列的死信路由键。 仅在以下情况下相关
autoBindDlq
是true
. 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。违约:
destination
- 声明Dlx
-
是否为目的地申报死信交换。 仅当
autoBindDlq
是true
. 设置为false
如果您有预配置的 DLX。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。违约:
true
. - 声明交换
-
是否声明目标的交换。
违约:
true
. - 延迟表达式
-
用于评估要应用于消息的延迟的 SpEL 表达式 (
x-delay
标头)。 如果交换不是延迟消息交换,则无效。默认值:否
x-delay
header 已设置。 - 延迟交换
-
是否将交易所声明为
Delayed Message Exchange
. 需要代理上的延迟消息交换插件。 这x-delayed-type
参数设置为exchangeType
.违约:
false
. - 交付模式
-
交付模式。
违约:
PERSISTENT
. - dlqBinding参数
-
将 dlq 绑定到死信交换时应用的参数;与
headers
deadLetterExchangeType
以指定要匹配的标头。 例如…dlqBindingArguments.x-match=any
,…dlqBindingArguments.someHeader=someValue
. 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:空
- dlq死信交换
-
声明 DLQ 时,要分配给该队列的 DLX。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
none
- dlqDeadLetterRoutingKey
-
声明 DLQ 时,要分配给该队列的死信路由密钥。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
none
- dlq过期
-
删除未使用的死信队列之前多长时间(以毫秒为单位)。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
no expiration
- dlq懒惰
-
使用
x-queue-mode=lazy
论点。 请参阅“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。 - dlq最大长度
-
死信队列中的最大消息数。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
no limit
- dlqMaxLength字节
-
所有消息的死信队列中的最大总字节数。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
no limit
- dlqMax优先级
-
死信队列中消息的最大优先级 (0-255) 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
none
- dlqQuorum.deliveryLimit
-
什么时候
quorum.enabled=true
,设置传递限制,在此之后,邮件将被丢弃或死信。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:无 - 代理默认值将适用。
- dlqQuorum.enabled
-
如果为 true,请创建仲裁死信队列,而不是经典队列。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。默认值:false
- dlqQuorum.initialGroupSize
-
什么时候
quorum.enabled=true
,设置初始仲裁大小。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:无 - 代理默认值将适用。
- dlqSingleActiveConsumer
-
设置为 true 以设置
x-single-active-consumer
queue 属性设置为 true。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。违约:
false
- dlqTtl
-
声明时应用于死信队列的默认生存时间(以毫秒为单位)。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
no limit
- 交易所自动删除
-
如果
declareExchange
是true
,交换是否应自动删除(在删除最后一个队列后将其删除)。违约:
true
. - 交换耐用
-
如果
declareExchange
是true
,交易所是否应该持久(在代理重启后幸存)。违约:
true
. - 交换类型
-
交易所类型:
direct
,fanout
,headers
或topic
对于未分区的目标和direct
,headers
或topic
用于分区目标。违约:
topic
. - 到期
-
删除未使用的队列之前多长时间(以毫秒为单位)。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
no expiration
- headerPatterns
-
要映射到出站邮件的标头的模式。
默认值:(所有标头)。
['*']
- 懒惰
-
使用
x-queue-mode=lazy
论点。 请参阅“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。违约:
false
. - 最大长度
-
队列中的最大消息数。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
no limit
- maxLength字节
-
队列中所有消息的最大总字节数。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
no limit
- 最大优先级
-
队列中消息的最大优先级 (0-255)。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
none
- 前缀
-
要添加到
destination
交换。默认值:“”。
- 生产者类型
-
生产者的类型。
-
AMQP
用于经典队列和仲裁队列的 AMQP 客户端 -
STREAM_SYNC
RabbitMQ Streams 插件客户端,阻塞直到收到确认 -
STREAM_ASYNC
RabbitMQ Streams 插件客户端,不阻塞默认值:“”。
-
- queueBindingArguments
-
将队列绑定到交换时应用的参数;与
headers
exchangeType
以指定要匹配的标头。 例如…queueBindingArguments.x-match=any
,…queueBindingArguments.someHeader=someValue
. 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:空
- 队列名称仅组
-
什么时候
true
,从名称等于group
. 否则队列名称为destination.group
. 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中消费时,这很有用。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:false。
- quorum.deliveryLimit
-
什么时候
quorum.enabled=true
,设置传递限制,在此之后,邮件将被丢弃或死信。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:无 - 代理默认值将适用。
- 法定人数已启用
-
如果为 true,请创建仲裁队列而不是经典队列。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。默认值:false
- quorum.initialGroup大小
-
什么时候
quorum.enabled=true
,设置初始仲裁大小。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。默认值:无 - 代理默认值将适用。
- 路由键表达式
-
用于确定发布消息时要使用的路由键的 SpEL 表达式。 对于固定路由键,请使用
routingKey
.违约:
destination
或destination-<partition>
用于分区目标。 - 路由键
-
定义发布邮件时要使用的固定路由键的字符串。
默认值:请参阅
routingKeyExpression
- 单主动消费者
-
设置为 true 以设置
x-single-active-consumer
queue 属性设置为 true。 仅适用于以下情况requiredGroups
提供,然后仅提供给这些组。违约:
false
- 交易
-
是否使用交易通道。
违约:
false
. - TTL的
-
声明时应用于队列的默认生存时间(以毫秒为单位)。 仅适用于以下情况
requiredGroups
提供,然后仅提供给这些组。违约:
no limit
- 使用ConfirmHeader
-
请参阅发布者确认。 相互排斥
confirmAckChannel
.对于 RabbitMQ,内容类型标头可以由外部应用程序设置。 Spring Cloud Stream 将它们作为用于任何类型传输的扩展内部协议的一部分,包括本机不支持标头的传输,例如 Kafka(0.11 之前)。
3.8. 发布者确认
有两种机制可以获取发布消息的结果;在每种情况下,连接工厂都必须具有publisherConfirmType
设置ConfirmType.CORRELATED
.
“遗留”机制是将confirmAckChannel
到消息通道的 bean 名称,您可以从中异步检索确认;负 ACK 将发送到错误通道(如果启用)- 请参阅错误通道。
3.1 版中添加的首选机制是使用相关数据头并通过其等待结果Future<Confirm>
财产。
这对于批处理侦听器特别有用,因为您可以在等待结果之前发送多条消息。
要使用此技术,请将useConfirmHeader
属性设置为 true
以下简单应用程序是使用此技术的示例:
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true
spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private StreamBridge bridge;
@Bean
Consumer<List<String>> input() {
return list -> {
List<MyCorrelationData> results = new ArrayList<>();
list.forEach(str -> {
log.info("Received: " + str);
MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
results.add(corr);
this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
});
results.forEach(correlation -> {
try {
Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
log.info(confirm + " for " + correlation.getPayload());
if (correlation.getReturnedMessage() != null) {
log.error("Message for " + correlation.getPayload() + " was returned ");
// throw some exception to invoke binder retry/error handling
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
});
};
}
@Bean
public ApplicationRunner runner(BatchingRabbitTemplate template) {
return args -> IntStream.range(0, 10).forEach(i ->
template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
}
@Bean
public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
}
}
class MyCorrelationData extends CorrelationData {
private final String payload;
MyCorrelationData(String id, String payload) {
super(id);
this.payload = payload;
}
public String getPayload() {
return this.payload;
}
}
如您所见,我们发送每条消息,然后等待发布结果。 如果无法路由消息,则在未来完成之前,将使用返回的消息填充关联数据。
相关数据必须提供唯一的id 以便框架可以执行关联。 |
不能同时设置两者useConfirmHeader
和confirmAckChannel
但是,在以下情况下,您仍然可以在错误通道中接收返回的消息useConfirmHeader
是 true,但使用 correlation 标头更方便。
3.9. RabbitMQ Stream插件的初始生产者支持
现在提供了对 RabbitMQ Stream 插件的基本支持。
要启用此功能,您必须将spring-rabbit-stream
jar 到类路径 - 它必须与spring-amqp
和spring-rabbit
.
当您将producerType 属性设置为STREAM_SYNC 或STREAM_ASYNC . |
将绑定器配置为使用流ProducerType
,Spring Boot 将配置一个Environment
@Bean
从应用属性。
可以选择添加定制器来自定义消息处理程序。
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
有关配置环境和生产者构建器的信息,请参阅 RabbitMQ Stream Java 客户端文档。
3.9.1. 生产者对 RabbitMQ 超级流的支持
有关超级流的信息,请参阅超级流。
使用超级流允许在超级流的每个分区上使用单个活动使用者进行自动纵向扩展、纵向缩减。 使用 Spring Cloud Stream,您可以通过 AMQP 或使用流客户端发布到超级流。
超级流必须已经存在;生产者绑定不支持创建超级流。 |
通过 AMQP 发布到超级流:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端发布到超级流:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端时,如果将confirmAckChannel
,则成功发送的消息的副本将发送到该通道。
4. 使用现有队列/交换
默认情况下,绑定程序将自动预配主题交换,其名称派生自目标绑定属性的值<prefix><destination>
.
目标默认为绑定名称(如果未提供)。
绑定使用者时,将自动预配一个队列,名称为<prefix><destination>.<group>
(如果group
binding 属性),或者如果没有group
.
队列将使用“match-all”通配符路由键 () 绑定到交换,用于非分区绑定或#
<destination>-<instanceIndex>
用于分区绑定。前缀为空String
默认情况下。如果输出绑定指定为requiredGroups
,将为每个组预配一个队列/绑定。
有许多特定于 rabbit 的绑定属性允许您修改此默认行为。
如果您希望使用现有的交换/队列,则可以完全禁用自动配置,如下所示,假设交换名为myExchange
队列命名为myQueue
:
-
spring.cloud.stream.bindings.<binding name>.destination=myExchange
-
spring.cloud.stream.bindings.<binding name>.group=myQueue
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
如果您希望 Binder 预配队列/交换,但又想使用此处讨论的默认值以外的其他内容来执行此作,请使用以下属性。有关更多信息,请参阅上面的属性文档。
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>
-
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'
声明死信交换/队列时使用类似的属性,当autoBindDlq
是true
.
5. 使用 RabbitMQ Binder 重试
在 Binder 中启用重试时,侦听器容器线程将在配置的任何回退期内挂起。当需要对单个使用者进行严格排序时,这可能很重要。但是,对于其他用例,它会阻止在该线程上处理其他消息。使用绑定器重试的替代方法是在死信队列 (DLQ) 上设置生存时间以及 DLQ 本身上的死信配置。有关此处讨论的属性的更多信息,请参阅“RabbitMQ Binder 属性”。您可以使用以下示例配置来启用此功能:
-
设置
autoBindDlq
自true
. 活页夹创建 DLQ。 或者,您可以在deadLetterQueueName
. -
设置
dlqTtl
到您要在重新投放之间等待的回退时间。 -
将
dlqDeadLetterExchange
到默认交易所。 来自 DLQ 的过期消息将路由到原始队列,因为默认的deadLetterRoutingKey
是队列名称 (destination.group
). 设置为默认交换是通过设置没有值的属性来实现的,如下一个示例所示。
要强制消息为死信,请抛出AmqpRejectAndDontRequeueException
或将requeueRejected
自false
(默认值)并抛出任何异常。
循环无休止地继续下去,这对于暂时性问题来说很好,但您可能想在尝试几次后放弃。
幸运的是,RabbitMQ 提供了x-death
标头,可让您确定发生了多少个周期。
要在放弃后确认消息,请抛出一个ImmediateAcknowledgeAmqpException
.
5.1. 把它们放在一起
以下配置创建交换myDestination
带队列myDestination.consumerGroup
绑定到具有通配符路由密钥的主题交换:#
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
此配置创建绑定到直接交换 (DLX
),路由键为myDestination.consumerGroup
.
当消息被拒绝时,它们将被路由到 DLQ。
5秒后,消息过期,使用队列名称作为路由键路由到原始队列,如以下示例所示:
@SpringBootApplication
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
Map<?,?> death = message.getHeaders().get("x-death");
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
};
}
}
请注意,count 属性中的x-death
header 是一个Long
.
6. 错误通道
从 1.3 版开始,绑定器无条件地将异常发送到每个使用者目标的错误通道,也可以配置为将异步生产者发送失败发送到错误通道。 有关更多信息,请参阅“[spring-cloud-stream-overview-error-handling]”。
RabbitMQ 有两种类型的发送失败:
-
返回的消息,
-
负面确认的出版商确认。
后者很少见。
根据 RabbitMQ 文档,“只有当负责队列的 Erlang 进程中发生内部错误时,才会传递 [a nack]。
如果将reject-publish
队列溢出行为。
除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中所述)外,RabbitMQ 绑定器仅在正确配置连接工厂时才向通道发送消息,如下所示:
-
ccf.setPublisherConfirms(true);
-
ccf.setPublisherReturns(true);
对连接工厂使用 Spring Boot 配置时,请设置以下属性:
-
spring.rabbitmq.publisher-confirms
-
spring.rabbitmq.publisher-returns
的有效负载ErrorMessage
对于返回的消息,是ReturnedAmqpMessageException
具有以下属性:
-
failedMessage
:春季消息Message<?>
未能发送。 -
amqpMessage
:原始 spring-amqpMessage
. -
replyCode
:指示失败原因的整数值(例如,312 - 无路由)。 -
replyText
:指示失败原因的文本值(例如,NO_ROUTE
). -
exchange
:将消息发布到的交换。 -
routingKey
:发布邮件时使用的路由密钥。
另请参阅发布者确认,了解接收返回消息的替代机制。
对于否定确认,有效负载是NackedAmqpMessageException
具有以下属性:
-
failedMessage
:春季消息Message<?>
未能发送。 -
nackReason
:原因(如果可用 - 您可能需要检查代理日志以获取更多信息)。
不会自动处理这些异常(例如发送到死信队列)。 您可以在自己的 Spring Integration 流中使用这些异常。
7. 兔活页夹健康指标
Rabbit binder 的运行状况指示器委托给 Spring Boot 提供的运行状况指示器。 有关这方面的更多信息,请参阅此。
您可以使用属性 - 在活页夹级别禁用此运行状况指示器management.health.binders.enabled
并将其设置为false
.
对于多活页夹环境,必须在活页夹的环境属性上设置。
禁用运行状况指示器后,应在运行状况执行器终结点中看到如下内容:
"rabbit": {
"status": "UNKNOWN"
}
在 Spring Boot 级别,如果要禁用 Rabbit 健康指示器,则需要使用management.health.rabbit.enabled
并设置为false
.
8. 死信队列处理
由于无法预测用户希望如何处理死信邮件,因此该框架不提供任何标准机制来处理它们。
如果死信的原因是暂时的,您可能希望将消息路由回原始队列。
但是,如果问题是永久性问题,则可能会导致无限循环。
以下 Spring Boot 应用程序显示了一个示例,说明如何将这些消息路由回原始队列,但在尝试三次后将它们移动到第三个“停车场”队列。
第二个示例使用 RabbitMQ 延迟消息交换来引入重新排队消息的延迟。
在此示例中,每次尝试的延迟都会增加。
这些示例使用@RabbitListener
接收来自 DLQ 的消息。
您还可以使用RabbitTemplate.receive()
在批处理中。
这些示例假定原始目标为so8400in
消费群体是so8400
.
8.1. 非分区目标
前两个示例适用于未对目标进行分区的情况:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8.2. 分区目标
对于分区目标,所有分区都有一个 DLQ。我们从标头确定原始队列。
8.2.1.republishToDlq=false
什么时候republishToDlq
是false
,RabbitMQ 将消息发布到 DLX/DLQ,并使用x-death
标头,其中包含有关原始目标的信息,如以下示例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8.2.2.republishToDlq=true
什么时候republishToDlq
是true
,则重新发布恢复器会将原始交换和路由密钥添加到标头,如以下示例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
9. 使用 RabbitMQ Binder 进行分区
RabbitMQ 原生不支持分区。
有时,将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时,特定客户的所有消息都应转到同一分区。
这RabbitMessageChannelBinder
通过将每个分区的队列绑定到目标交换来提供分区。
以下 Java 和 YAML 示例显示了如何配置生产者:
@SpringBootApplication
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
前面示例中的配置使用默认分区 ( 这 |
以下配置预配主题交换:

以下队列绑定到该交换:

以下绑定将队列与交换相关联:

以下 Java 和 YAML 示例延续了前面的示例,并展示了如何配置使用者:
@SpringBootApplication
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}
}
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
这RabbitMessageChannelBinder 不支持动态缩放。
每个分区必须至少有一个使用者。
消费者的instanceIndex 用于指示使用哪个分区。
Cloud Foundry 等平台只能有一个实例,具有instanceIndex . |