Rabbit MQ Binder
参考指南
本指南介绍了RabbitMQ对Spring Cloud StreamBinder的实现。它包含有关其设计、使用和配置选项的信息,以及有关StreamCloudStream概念如何映射到RabbitMQ特定构造的信息。
1. 用法
要使用RabbitMQ绑定器,您可以将其添加到您的Spring Cloud流应用程序中,方法是使用以下Maven坐标:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
相反,您可以使用Spring Cloud Stream RabbitMQStarters,如下所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2. RabbitMQ Binder 概述
以下简化示意图展示了RabbitMQ binder的运行方式:
默认情况下,RabbitMQ 绑定实现将每个目标映射到TopicExchange。对于每个消费者组,Queue绑定到该TopicExchange。每个消费者实例都有一个对应的RabbitMQConsumer实例,用于其组的Queue。对于分区生产者和使用者,队列使用分区索引后缀,并将分区索引作为路由密钥。对于匿名使用者(没有group属性),将使用自动删除队列(带随机唯一名称)。
通过可选的autoBindDlq选项,您可以配置绑定器创建并配置死信队列(DLQs)(以及一个死信交换DLX,以及路由基础设施)。默认情况下,死信队列的名称为目的地名称加上 .dlq。如果启用了重试(maxAttempts > 1),则在重试用尽后,会将失败的消息传递到死信队列。如果重试被禁用(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便失败的消息被路由到DLQ,而不是重新排队。
另外,republishToDlq 会导致绑定程序将失败的消息发布到 DLQ(而不是拒绝它)。This feature lets additional information (such as the stack trace in the x-exception-stacktrace header) be added to the message in headers.查看frameMaxHeadroom属性,了解有关截断堆栈跟踪的信息。这个选项不需要启用重试。您可以仅在一次尝试后重新发布失败的消息。
Starting with version 1.
2,您可以配置重新发布消息的交付模式。参见republishDeliveryMode属性。
如果流侦听器抛出一个ImmediateAcknowledgeAmqpException,则会跳过DLQ,并且消息将被简单地丢弃。从版本2.1开始,无论republishToDlq如何设置,这都是正确的;在以前,只有当republishToDlq为false时才成立。
将 requeueRejected 设置为 true(使用 republishToDlq=false )会导致消息被重新排队并不断重新传递,除非故障是瞬态的,否则这可能不是您想要的操作。 通常,您应通过将 maxAttempts 设置为大于一或将 republishToDlq 设置为 true 来在绑定器中启用重试。 |
从版本 3.1.2 开始,如果消费者标记为 transacted,向死信队列(DLQ)发布消息将参与事务。
这允许在由于某些原因导致发布失败时回滚事务(例如,用户没有权限向死信交换机发布消息)。
此外,如果连接工厂配置了发布确认或返回机制,则向 DLQ 发布消息会等待确认并检查是否有返回的消息。
如果收到否定确认或返回消息,绑定器将抛出 AmqpRejectAndDontRequeueException,使代理可以像设置了 republishToDlq 属性为 false 一样处理向 DLQ 的发布。
有关这些属性的更多信息,请参阅RabbitMQ绑定器属性。
该框架不提供任何标准机制来处理死信消息(或将其重新路由回主队列)。死信队列处理中描述了一些选项。
当在Spring Cloud Stream应用程序中使用多个RabbitMQ绑定器时,禁用'RabbitAutoConfiguration'很重要,以避免相同的配置从RabbitAutoConfiguration应用于两个绑定器。您可以使用 @SpringBootApplication注解来排除该类。 |
从版本2.0开始,RabbitMessageChannelBinder 将 RabbitTemplate.userPublisherConnection 属性设置为 true,以便非事务性生产者避免死锁,这可能会导致由于代理上的内存警报而导致缓存连接被阻塞。
| 当前,仅支持消息驱动的消费者(即单个消费者监听多个队列);轮询式消费者只能从单一队列中检索消息。 |
3. 配置选项
本节包含与 RabbitMQ 绑定程序和绑定通道相关的设置。
有关通用绑定配置选项和属性的信息,请参阅Spring Cloud Stream核心文档。
3.1. RabbitMQ Binder 属性
默认情况下,RabbitMQ 绑定器使用 Spring Boot 的 ConnectionFactory。因此,它支持 RabbitMQ 的所有 Spring Boot 配置选项。(有关参考,请参阅 Spring Boot 文档)。RabbitMQ 配置选项使用 spring.rabbitmq 前缀。
除了Spring Boot选项之外,RabbitMQ绑定器还支持以下属性:
- spring.cloud.stream.rabbit.binder.adminAddresses
-
一个逗号分隔的 RabbitMQ 管理插件 URL 列表。仅在
nodes包含多个条目时使用。spring.rabbitmq.addresses中必须有对应的条目。如果您使用 RabbitMQ 集群并希望从托管队列的节点消费,则需要此列表。有关更多信息,请参阅队列亲和性和 LocalizedQueueConnectionFactory。默认:空。
- spring.cloud.stream.rabbit.binder.nodes
-
用逗号分隔的 RabbitMQ 节点名称列表。
当有多个条目时,用于定位队列所在的服务器地址。
此列表中的每个条目都必须在spring.rabbitmq.addresses中有相应的条目。
只有在使用 RabbitMQ 集群并且希望从托管队列的节点消费时才需要。
有关更多信息,请参阅队列关联性和本地化队列连接工厂。默认:空。
- spring.cloud.stream.rabbit.binder.compressionLevel
-
压缩绑定的压缩级别。
参见java.util.zip.Deflater。默认:
1(BEST_LEVEL)。 - spring.cloud.stream.binder.connection-name-prefix
-
此绑定器创建的连接使用的连接名称前缀。 每次打开新的连接时,
#n会递增。
名称为此前缀后跟#n,其中n每次打开新连接时都会递增。默认值:无(Spring AMQP 默认值)。
3.2. RabbitMQ 消费者属性
以下属性仅适用于 Rabbit 消费者,并且必须以 spring.cloud.stream.rabbit.bindings.<channelName>.consumer. 为前缀。
但是,如果需要将同一组属性应用于大多数绑定,为了防止重复,Spring Cloud Stream 支持在格式为 spring.cloud.stream.rabbit.default.<property>=<value> 的所有通道上设置值。
也请记住,绑定特定属性会将其默认值覆盖。
- 确认模式
-
确认模式。
默认值:
AUTO。 - 匿名组前缀
-
当绑定没有
group属性时,会将一个匿名、自动删除的队列绑定到目标交换机。
此类队列的默认命名策略会导致队列被命名为anonymous.<base64 representation of a UUID>。
设置此属性可更改为除默认值以外的前缀。默认值:
anonymous.。 - 自动绑定死信队列
-
是否自动声明死信队列(DLQ)并将其绑定到绑定器的死信交换(DLX)。
默认值:
false。 - 绑定路由键
-
用于将队列绑定到交换机的路由键(如果
bindQueue是true)。可以有多个键——参见bindingRoutingKeyDelimiter。对于分区目的地,将在每个键后追加-<instanceIndex>。默认值:
#。 - 绑定路由键分隔符
-
当此值不为 null 时,将把 'bindingRoutingKey' 视为由该值分隔的键列表;通常使用逗号作为分隔符。
默认值:
null。 - 绑定队列
-
是否声明队列并将其绑定到目标交换机。
如果已设置自己的基础设施并且先前创建并绑定了队列,请将其设置为false。默认值:
true。 - 消费者标记前缀
-
用于创建消费者标签;每次创建一个消费者时,将会附加
#n。
示例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}。默认值:无 - 代理将生成随机的消费者标签。
- 容器类型
-
选择要使用的监听器容器类型。 有关更多信息,请参阅Spring AMQP文档中的选择容器部分。 另请参见[rabbitmq-stream]。
默认值:
simple - 死信队列名称
-
死信队列的名称
默认值:
prefix+destination.dlq - 死信交换
-
一个DLX分配到队列。 仅当
autoBindDlq为true时相关。默认值:'前缀+死信交换'
- 死信交换机类型
-
要分配给队列的DLX的类型。
仅当autoBindDlq为true时相关。默认值:'direct'
- 死信路由键
-
分配给队列的死信路由密钥。仅在
autoBindDlq是true时相关。默认值:
destination - 声明死信交换器
-
是否为目的地声明死信交换? 相关仅当
autoBindDlq为true时。 若已预配置DLX,请设为false。默认值:
true。 - 声明交换机
-
是否为目的地声明交换机。
默认值:
true。 - 延迟交换
-
是否将交换声明为
Delayed Message Exchange。
需要在代理上安装延迟消息交换插件。
将x-delayed-type参数设置为exchangeType。默认值:
false。 - 延迟队列绑定参数
-
绑定死信交换时应用于死信队列(dlq)的参数;与
headersdeadLetterExchangeType结合使用,用于指定要匹配的头部信息。例如…dlqBindingArguments.x-match=any,…dlqBindingArguments.someHeader=someValue。默认:空
- 死信队列死信交换机
-
如果声明了一个死信队列(DLQ),则需指定一个死信交换器(DLX)来分配给该队列。
默认值:
none - 死信队列死信路由键
-
如果声明了死信队列,则需要指定一个分配给该队列的死信路由键。
默认值:
none - 延迟队列到期
-
未使用的死信队列在被删除前的等待时间(以毫秒为单位)。
默认值:
no expiration - 懒加载
-
使用
x-queue-mode=lazy参数声明死信队列。参见“延迟队列”。建议改用策略而不是此设置,因为策略允许在不删除队列的情况下更改该设置。默认值:
false。 - 最大dlq长度
-
死信队列中的最大消息数。
默认值:
no limit - 最大字节数
-
死信队列中所有消息的最大总字节数。
默认值:
no limit - 最大优先级队列
-
死信队列中消息的最大优先级(0-255)。
默认值:
none - 队列溢出时的行为
-
当超过
dlqMaxLength或dlqMaxLengthBytes时要采取的操作;目前为drop-head或reject-publish,但请参考RabbitMQ文档。默认值:
none - dlqQuorum.deliveryLimit
-
当
quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转为死信。默认:无 - 将应用代理程序默认设置。
- dlqQuorum.enabled
-
当为 true 时,创建一个法定人数死信队列而不是经典队列。
(默认值:false)
- dlqQuorum.initialGroupSize
-
当
quorum.enabled=true时,设置初始法定人数大小。默认:无 - 将应用代理程序默认设置。
- 单个活动消费者
-
设置为 true 可将
x-single-active-consumer队列属性设为 true。默认值:
false - dlqTtl
-
声明死信队列时应用的默认生存时间(以毫秒为单位)。
默认值:
no limit - 持久化订阅
-
订阅是否应该持久化。只有在同时设置
group时才有效。默认值:
true。 - 交换自动删除
-
如果
declareExchange为真,则表示交换机是否应该自动删除(即在最后一个队列被删除后将其移除)。默认值:
true。 - 交换机持久化
-
如果
declareExchange为真,则表示交换机是否应具有持久性(即,在代理重启后仍然存在)。默认值:
true。 - 交易类型
-
交换类型:
direct、fanout、headers或topic表示非分区目的地,而direct、标题或topic表示分区目的地。默认值:
topic。 - 独家
-
是否创建独占消费者。 当此值为
true时,并发应设置为 1。 通常在需要严格顺序但启用热备用实例以在发生故障后接管时使用。 参见recoveryInterval,它控制备用实例尝试消费的频率。 当使用 RabbitMQ 3.8 或更高版本时,建议改用singleActiveConsumer。默认值:
false。 - 到期
-
多久以后未使用的队列会被删除(以毫秒为单位)。
默认值:
no expiration - 声明失败重试间隔
-
如果队列缺失,则尝试从队列中消耗之间的间隔(以毫秒为单位)。
默认值:5000
- 框架最大回程
-
将堆栈跟踪添加到死信队列(DLQ)消息头时,为其他标头保留的字节数。所有标头必须符合代理上配置的
frame_max大小。
堆栈跟踪可能很大;如果此属性加上堆栈跟踪的大小超过frame_max,则堆栈跟踪将被截断。
将会记录一个警告日志;考虑增加frame_max或通过捕获异常并抛出自定义较小堆栈跟踪的异常来减少堆栈跟踪。默认值:20000
- headerPatterns
-
从入站消息映射标头的模式。
默认:
['*'](所有头部)。 - 懒加载
-
使用
x-queue-mode=lazy参数声明队列。
参见 “延迟队列”。
建议改用策略,而不是此设置,因为策略允许在不删除队列的情况下更改设置。默认值:
false。 - 最大并发数
-
消费者的最大数量。
当containerType为direct时不支持。默认值:
1。 - 最大长度
-
队列中消息的最大数量。
默认值:
no limit - 最大长度字节
-
队列中所有消息的最大总字节数。
默认值:
no limit - 最高严重程度
-
队列中消息的最大优先级(0-255)。
默认值:
none - 缺少队列致命
-
当找不到队列时,是否将此情况视为致命错误并停止监听器容器。默认值为
false,因此容器会不断尝试从队列中消费消息——例如,在使用集群时,如果托管非高可用性队列的节点宕机。默认值:
false - 溢出行为
-
当超过
maxLength或maxLengthBytes时要采取的操作;目前为drop-head或reject-publish,但请参考RabbitMQ文档。默认值:
none - 预取
-
预取计数。
默认值:
1。 - 前缀
-
要添加到
destination和队列名称前缀。默认值:""。
- 队列绑定参数
-
绑定队列到交换时应用的参数;用于指定要匹配的标头,例如
…queueBindingArguments.x-match=any、…queueBindingArguments.someHeader=someValue。
使用headers或exchangeType来指定要匹配的标头。默认:空
- 队列声明重试次数
-
如果队列缺失,从队列中重新消费的重试次数。
仅当missingQueuesFatal是true时有效。
否则,容器将无限期地重试。
不支持当containerType是direct的情况。默认值:
3 - 仅队列名组
-
当为 true 时,从队列名称等于
group的队列中消费。否则队列名称为destination.group。例如,在使用 Spring Cloud Stream 来从现有的 RabbitMQ 队列进行消费时,这很有用。默认值:false。
- quorum.deliveryLimit
-
当
quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转为死信。默认:无 - 将应用代理程序默认设置。
- quorum.enabled
-
为真时,创建一个法定人数队列而不是经典队列。
(默认值:false)
- quorum.initialGroupSize
-
当
quorum.enabled=true时,设置初始法定人数大小。默认:无 - 将应用代理程序默认设置。
- 恢复间隔
-
连接恢复尝试之间的间隔,单位为毫秒。
默认值:
5000。 - 重新排队拒绝的
-
当重试被禁用或
republishToDlq时,是否应重新排队交付失败的消息。false表示是。默认值:
false。
- 重新发布配送模式
-
当
republishToDlq是true时,指定重新发布的消息的传递模式。默认值:
DeliveryMode.PERSISTENT - 转发到死信队列
-
默认情况下,重试次数用尽后仍失败的消息会被拒绝。如果配置了死信队列(DLQ),RabbitMQ会将失败的消息(未更改)路由到DLQ。如果设置为
true,绑定器会将失败的消息重新发布到DLQ,并添加额外的标题,包括最终失败原因中的异常消息和堆栈跟踪。另请参阅frameMaxHeadroom属性。默认值:
true - singleActiveConsumer
-
设置为 true 可将
x-single-active-consumer队列属性设为 true。默认值:
false - 事务性
-
是否使用事务通道。
默认值:
false。 - TTL
-
声明队列时要应用的默认存活时间(以毫秒为单位)。
默认值:
no limit - 字体大小
-
在确认之间交付的数量。
当containerType是direct时不支持。默认值:
1。
3.3. RabbitMQ 流插件的初始消费者支持
基本支持 RabbitMQ 流插件 现已提供。
要启用此功能,必须向类路径添加 spring-rabbit-stream jar 文件 - 它必须与 spring-amqp 和 spring-rabbit 版本相同。
上面描述的消费者属性在设置containerType属性为stream时不支持;仅支持超流上的concurrency。
每个绑定只能由一个流队列消费。 |
要让绑定程序使用containerType=stream,Spring Boot会自动从应用程序属性中配置一个Environment@Bean。您还可以可选地添加一个定制器来自定义侦听器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
传递给自定义器的 name 参数是 destination + '.' + group + '.container'。
流name()(用于偏移量跟踪)被设置为绑定destination + '.' + group。可以使用上图中的ConsumerCustomizer进行更改。如果您决定使用手动偏移量跟踪,则可以通过消息头获取Context:
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有关配置环境和消费者构建器的信息,请参阅RabbitMQ Stream Java客户端文档。
3.3.1. RabbitMQ 超级流的消费者支持
见 Super Streams 以了解有关超级流的信息。
使用超级流允许在超级流的每个分区上仅有一个活动消费者时自动扩展和缩减。
配置示例:
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
框架将创建一个名为 super 的超级流,包含 9 个分区。
最多可以部署此应用程序的 3 个实例。
3.4. 高级监听器容器配置
要设置未通过绑定程序或绑定属性公开的侦听器容器属性,请向应用程序上下文中添加一个 0 类型的 bean。
将设置绑定程序和绑定属性,然后调用自定义程序。
自定义程序(1 方法)将提供队列名称以及消费者组作为参数。
3.5. 高级队列/交换机/绑定配置
每隔一段时间,RabbitMQ团队都会添加一些新功能,这些功能需要在声明时设置某些参数才能启用,例如,一个队列。通常,通过添加适当的属性来启用此类功能,但在当前版本中可能不会立即可用。从版本 3.0.1 开始,您可以现在向应用程序上下文添加 DeclarableCustomizer 个 bean(s),以便在声明之前进行修改。这允许您添加当前未直接由绑定器支持的参数。
3.6. 接收批处理消息
使用 RabbitMQ 绑定程序时,消费者绑定会处理两种类型的批次:
3.6.1. 生产者创建的批次
通常,如果生产者绑定具有batch-enabled=true(参见Rabbit Producer Properties),或者消息是由BatchingRabbitTemplate创建的,则批次中的元素将作为对监听器方法的单独调用返回。
从版本3.0开始,如果将spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,则可以将此类任何批次呈现为对监听器方法的List<?>。
3.6.2. 消费者端批处理
从版本 3.1 开始,可以配置消费者将多个传入消息组合成一个批次,该批次将以转换后的有效负载 List<?> 的形式呈现给应用程序。
以下简单应用程序演示了如何使用此技术:
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<List<Thing>> input() {
return list -> {
System.out.println("Received " + list.size());
list.forEach(thing -> {
System.out.println(thing);
// ...
});
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1]
Thing [field=value2]
批处理中的消息数量由batch-size和receive-timeout属性指定;如果在receive-timeout时间内没有新的消息到达,则会交付一个“短”批次。
消费端批量处理仅在container-type=simple(默认)时支持。 |
如果您希望检查消费者端批处理消息的标题,您应该使用Message<List<?>>;标题是List<Map<String, Object>>的一种,在一个标题AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS中,每个有效载荷元素的标题在相应索引处。
再次强调,这里有一个简单例子:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<Message<List<Thing>>> input() {
return msg -> {
List<Thing> things = msg.getPayload();
System.out.println("Received " + things.size());
List<Map<String, Object>> headers =
(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
for (int i = 0; i < things.size(); i++) {
System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));
// ...
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue1");
return msg;
});
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue2");
return msg;
});
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getfield() {
return this.field;
}
public void setfield(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2
3.7. Rabbit Producer 属性
Rabbit 生成器特有的以下属性可用,并且必须以“0”开头。
但是,如果需要将同一组属性应用于大多数绑定,为了防止重复,Spring Cloud Stream 支持在格式为 spring.cloud.stream.rabbit.default.<property>=<value> 的所有通道上设置值。
也请记住,绑定特定属性会将其默认值覆盖。
- altermateExchange.binding.queue
-
如果交换机尚不存在,并且提供了
name,则将此队列绑定到备用交换机。提供一个简单的持久化队列,无需任何参数;如果需要更复杂的配置,则必须自行配置并绑定队列。默认值:
null
alternateExchange.binding.routingKey
如果交换机尚不存在,并且提供了name和queue,则使用此路由键将队列绑定到备用交换机。默认值: <代码>0 (对于默认交换 <代码>1 备用)
- alternateExchange.exists
-
是否轮换交换机存在,或者需要进行预配。
默认值:
false - alternateExchange.type
-
如果备用交换机尚不存在,则要配置的交换机类型。
默认值:
topic - alternateExchange.name
-
配置目标交换机的备用交换机。
默认值:
null - 自动绑定死信队列
-
是否自动声明死信队列(DLQ)并将其绑定到绑定器的死信交换(DLX)。
默认值:
false。 - 批量处理是否启用
-
是否启用生产者的消息批处理。消息会根据以下属性(在本列表中的接下来三个条目中描述)进行批处理:
batchSize、batchBufferLimit和batchTimeout。批量处理以获取更多信息。接收已批处理的消息。默认值:
false。 - 批处理大小
-
启用批处理时,缓冲的消息数量。
默认值:
100。 - 批次缓冲限制
-
批量处理启用时的最大缓冲区大小。
默认值:
10000。 - 批量超时
-
启用批处理时的批处理超时时间。
默认值:
5000。 - 绑定路由键
-
绑定队列到交换机时使用的路由键(如果
bindQueue是true)。 可以有多个键 - 参见bindingRoutingKeyDelimiter。 对于分区目的地,每个键都会追加上-n。 仅在提供requiredGroups时适用,并且仅对这些组有效。默认值:
#。 - 绑定路由键分隔符
-
当此值不为 null 时,'bindingRoutingKey' 被认为是由该值分隔的键列表;通常使用逗号作为分隔符。 仅在提供
requiredGroups个值时适用,并且只适用于这些组。默认值:
null。 - 绑定队列
-
是否声明队列并将其绑定到目标交换机。如果已设置好自己的基础设施并且先前已创建并绑定了队列,则将其设置为
false。仅在提供requiredGroups时适用,然后仅适用于这些组。默认值:
true。 - 压缩
-
数据发送时是否应进行压缩。
默认值:
false。 - 确认消息通道
-
当
errorChannelEnabled为真时,表示要发送正向交付确认(即发布者确认)的通道。
如果该通道不存在,则会使用此名称注册一个DirectChannel。
必须配置连接工厂以启用发布者确认。
与useConfirmHeader互斥。默认:
nullChannel(确认被丢弃)。 - 死信队列名称
-
死信队列的名称仅在提供
requiredGroups时适用,且仅适用于这些组。默认值:
prefix+destination.dlq - 死信交换
-
将DLX分配给队列。
仅在autoBindDlq为true时相关。
仅当提供requiredGroups时适用,且仅对这些组有效。默认值:'前缀+死信交换'
- 死信交换机类型
-
分配给队列的DLX类型。仅当
autoBindDlq为true时相关。
只有在提供requiredGroups时才适用,然后仅适用于这些组。默认值:'direct'
- 死信路由键
-
分配给队列的死信路由密钥。 仅在
autoBindDlq是true时相关。 只有当提供requiredGroups时才适用,然后只适用于这些组。默认值:
destination - 声明死信交换器
-
是否为目的地声明死信交换。 仅在
autoBindDlq是true时相关。 如果已配置DLX,请设置为false。 仅当提供了requiredGroups时适用,且仅适用于这些组。默认值:
true。 - 声明交换机
-
是否为目的地声明交换机。
默认值:
true。 - 延迟表达式
-
一个SpEL表达式,用于评估要应用到消息的延迟时间(
x-delay标题)。如果交换不是延迟消息交换,则此操作无效。默认情况下:未设置
x-delay标题。 - 延迟交换
-
是否将交换声明为
Delayed Message Exchange。
需要在代理上安装延迟消息交换插件。
将x-delayed-type参数设置为exchangeType。默认值:
false。 - 配送方式
-
交付模式。
默认值:
PERSISTENT。 - 延迟队列绑定参数
-
绑定死信队列(DLQ)到死信交换时应用的参数;与
headersdeadLetterExchangeType一起使用,用于指定要匹配的头部信息。
例如…dlqBindingArguments.x-match=any,…dlqBindingArguments.someHeader=someValue。
仅在提供requiredGroups时适用,并且只对这些组有效。默认:空
- 死信队列死信交换机
-
当声明死信队列(DLQ)时,会分配一个死信交换器(DLX)给该队列。
仅在提供了requiredGroups的情况下适用,并且仅适用于这些组。默认值:
none - 死信队列死信路由键
-
声明死信队列 (DLQ) 后,将一个死信路由键分配给该队列。 仅在提供
requiredGroups时适用,并且仅对这些组有效。默认值:
none - 延迟队列到期
-
闲置的死信队列在被删除之前保留的时间(以毫秒为单位)。仅当提供
requiredGroups时适用,并且仅对这些组有效。默认值:
no expiration - 懒加载
-
声明死信队列时使用
x-queue-mode=lazy参数。 参见 “惰性队列”。 建议使用策略而不是此设置,因为使用策略可以在不删除队列的情况下更改设置。 仅在提供requiredGroups个参数时适用,并且仅对这些组有效。 - 最大dlq长度
-
死信队列中的最大消息数。 仅在提供
requiredGroups时适用,并且仅对这些组有效。默认值:
no limit - 最大字节数
-
死信队列中所有消息的最大总字节数。仅在提供
requiredGroups时适用,且仅对这些组有效。默认值:
no limit - 最大优先级队列
-
死信队列中消息的最大优先级(0-255)
仅在提供requiredGroups时适用,且仅对这些组有效。默认值:
none - dlqQuorum.deliveryLimit
-
当
quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转入死信队列。仅在提供requiredGroups时生效,并且仅适用于这些组。默认:无 - 将应用代理程序默认设置。
- dlqQuorum.enabled
-
当为 true 时,创建一个法定人数死信队列而不是经典队列。
仅在提供requiredGroups时适用,并且仅对这些组有效。(默认值:false)
- dlqQuorum.initialGroupSize
-
当
quorum.enabled=true时,设置初始多数大小。
仅在提供requiredGroups时适用,并且仅适用于这些组。默认:无 - 将应用代理程序默认设置。
- 单个活动消费者
-
设置为 true 将
x-single-active-consumer队列属性设为true。仅当提供requiredGroups时适用,且仅适用于这些组。默认值:
false - dlqTtl
-
声明时应用于死信队列的默认存活时间(以毫秒为单位)。仅在提供
requiredGroups时适用,且仅对这些组生效。默认值:
no limit - 交换自动删除
-
如果
declareExchange是true,则交换是否应自动删除(在最后一个队列被移除后)。默认值:
true。 - 交换机持久化
-
如果
declareExchange是true,则交换是否应该具有持久性(在代理重启后仍然存在)。默认值:
true。 - 交易类型
-
交换类型:
direct,fanout,headers或topic用于非分区目的地,而direct,headers或topic用于分区目的地。默认值:
topic。 - 到期
-
闲置队列在被删除前等待的时间(以毫秒为单位)。仅当提供
requiredGroups时适用,并且仅对这些组有效。默认值:
no expiration - headerPatterns
-
用于映射到传出消息的标题模式。
默认:
['*'](所有头部)。 - 懒加载
-
使用
x-queue-mode=lazy参数声明队列。查看“延迟队列”。考虑使用策略而不是此设置,因为使用策略可以在不删除队列的情况下更改设置。仅当提供了requiredGroups时才适用,然后只适用于这些组。默认值:
false。 - 最大长度
-
队列中的最大消息数。仅在提供
requiredGroups时适用,并且仅对这些组有效。默认值:
no limit - 最大长度字节
-
队列中所有消息的最大总字节数。仅在提供
requiredGroups时适用,并且仅适用于这些组。默认值:
no limit - 最高严重程度
-
队列中消息的最大优先级(0-255)。仅当提供
requiredGroups时适用,并且仅对这些组有效。默认值:
none - 前缀
-
要添加到
destination交换机名称前的前缀。默认值:""。
- 生产者类型
-
生产者的类型。
-
AMQP经典队列和法定人数队列的AMQP客户端 -
STREAM_SYNCRabbitMQ Streams 插件客户端,阻塞直到收到确认 -
STREAM_ASYNCRabbitMQ 流插件客户端,不阻塞默认值:""。
-
- 队列绑定参数
-
绑定队列到交换时应用的参数;用于
headersexchangeType指定要匹配的标头。
例如…queueBindingArguments.x-match=any,…queueBindingArguments.someHeader=someValue。
仅在提供requiredGroups时适用,且仅适用于这些组。默认:空
- 仅队列名组
-
当为
true时,从队列名称与group相等的队列中消费。否则队列名称为destination.group。例如,在使用Spring Cloud Stream消费现有的RabbitMQ队列时,这非常有用。仅在提供requiredGroups时适用,并且仅对这些组有效。默认值:false。
- quorum.deliveryLimit
-
当
quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转入死信队列。仅在提供requiredGroups时生效,并且仅适用于这些组。默认:无 - 将应用代理程序默认设置。
- quorum.enabled
-
当为 true 时,创建一个法定人数队列而不是经典队列。
仅在提供requiredGroups时适用,然后仅适用于这些组。(默认值:false)
- quorum.initialGroupSize
-
当
quorum.enabled=true时,设置初始多数大小。
仅在提供requiredGroups时适用,并且仅适用于这些组。默认:无 - 将应用代理程序默认设置。
- 路由键表达式
-
用于确定发布消息时使用的路由键的SpEL表达式。对于固定路由键,请使用
routingKey。默认值:
destination或destination-<partition>(针对分区目的地)。 - routingKey
-
一个定义了发布消息时使用的固定路由密钥的字符串。
默认:查看
routingKeyExpression - singleActiveConsumer
-
设置为 true 将
x-single-active-consumer队列属性设为true。仅当提供requiredGroups时适用,且仅适用于这些组。默认值:
false - 事务性
-
是否使用事务通道。
默认值:
false。 - TTL
-
声明队列时要应用的默认存活时间(以毫秒为单位)。仅当提供
requiredGroups时才会对这些组生效。默认值:
no limit - 使用确认标题
-
查看发布者确认。与
confirmAckChannel互斥。在RabbitMQ的情况下,内容类型标头可以通过外部应用程序进行设置。</p><p>Spring Cloud Stream将其作为用于任何类型的传输的扩展内部协议的一部分来支持——包括Kafka(0.11之前版本)等不原生支持标头的传输方式。
3.8. 发布者确认
有两种机制可以获取发布消息的结果;在每种情况下,连接工厂必须将publisherConfirmType设置为ConfirmType.CORRELATED。
“遗留”机制是将confirmAckChannel设置为可以从其中异步检索确认的消息通道的bean名称;负确认被发送到错误通道(如果启用)-请参阅错误通道。
在版本3.1中添加的首选机制是使用相关数据标头并根据其Future<Confirm>属性等待结果。
这特别适用于批处理侦听器,因为可以在等待结果之前发送多个消息。要使用此技术,请将useConfirmHeader属性设置为true
以下是一个使用此技术的简单应用程序的示例:
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true
spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private StreamBridge bridge;
@Bean
Consumer<List<String>> input() {
return list -> {
List<MyCorrelationData> results = new ArrayList<>();
list.forEach(str -> {
log.info("Received: " + str);
MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
results.add(corr);
this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
});
results.forEach(correlation -> {
try {
Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
log.info(confirm + " for " + correlation.getPayload());
if (correlation.getReturnedMessage() != null) {
log.error("Message for " + correlation.getPayload() + " was returned ");
// throw some exception to invoke binder retry/error handling
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
});
};
}
@Bean
public ApplicationRunner runner(BatchingRabbitTemplate template) {
return args -> IntStream.range(0, 10).forEach(i ->
template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
}
@Bean
public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
}
}
class MyCorrelationData extends CorrelationData {
private final String payload;
MyCorrelationData(String id, String payload) {
super(id);
this.payload = payload;
}
public String getPayload() {
return this.payload;
}
}
正如您所看到的,我们发送每条消息,然后等待发布结果。如果消息无法路由,那么将在未来完成之前用返回的消息填充相关数据。
The correlation data must be provided with a unique id so that the framework can perform the correlation. |
您不能同时设置 useConfirmHeader 和 confirmAckChannel,但在 useConfirmHeader 为 true 时仍可以接收错误通道中的返回消息,不过使用关联头会更方便。
3.9. 对RabbitMQ流插件的初始生产者支持
基本支持 RabbitMQ 流插件 现已提供。
要启用此功能,必须向类路径添加 spring-rabbit-stream jar 文件 - 它必须与 spring-amqp 和 spring-rabbit 版本相同。
当上述生产者属性设置为 STREAM_SYNC 或 STREAM_ASYNC 时,描述的生产者属性不被支持。 |
要配置绑定器以使用流 ProducerType,Spring Boot 将从应用程序属性中配置一个 Environment @Bean。您可以选择添加自定义程序来定制消息处理器。
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
Refer to the RabbitMQ Stream Java Client documentation for information about configuring the environment and producer builder.
3.9.1. RabbitMQ 超级流的生产者支持
见 Super Streams 以了解有关超级流的信息。
使用超流可以单个活动使用者与超级流的每个分区一起自动扩展,具有单一的主动使用者。使用Spring Cloud Stream,您可以使用AMQP或使用流客户端向超级流发布。
| Super 流必须已经存在;不支持通过生产者绑定创建 Super 流。 |
发布到 AMQP 超流:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端向超级流发布:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
使用流客户端时,如果将 confirmAckChannel 设置为通道,则会将成功发送的消息副本发送到该通道。
4. 使用现有的队列/交换机
默认情况下,绑定器会自动 provision 具有与绑定属性 <prefix><destination> 的值同名的目标 topic exchange。目标默认为绑定名,如果未提供。当绑定使用者时,将自动 provision 名为 <prefix><destination>.<group> 的队列(如果指定了 group 绑定属性),或者没有 group 时使用匿名、自动删除队列。队列将使用“匹配全部”通配符路由键(#)进行非分区绑定或 <destination>-<instanceIndex> 进行分区绑定进行绑定。前缀默认为空 String。如果指定了带有 requiredGroups 的输出绑定,则为每个组 provision 队列/绑定。
有几种与兔子相关的绑定属性,允许您修改此默认行为。
如果您有一个想要使用的现有exchange/queue,您可以完全禁用自动配置,假设exchange名为myExchange,queue名为myQueue: \ "
}
-
spring.cloud.stream.bindings.<binding name>.destination=myExchange -
spring.cloud.stream.bindings.<binding name>.group=myQueue -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
如果要让绑定程序为队列/交换机提供服务,但又想使用不同于此处讨论的默认值,请使用以下属性。 有关更多信息,请参阅上述属性文档。
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type> -
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'
有相似的属性在声明死信交换/队列时使用,当autoBindDlq是true。
5. 使用 RabbitMQ 绑定重试
当在绑定程序中启用重试时,监听器容器线程会在配置的任何退避期间被暂停。这在单个消费者需要严格顺序时可能很重要。但是,在其他使用情况下,它会阻止该线程上的其他消息被处理。替代使用绑定程序重试的方法是设置死信队列(DLQ)的时间存活以及DLQ本身的死信配置,并结合使用时间到活期功能。有关此处讨论属性的更多信息,请参阅“RabbitMQ 绑定程序属性”。您可以使用以下示例配置来启用此功能:
-
设置
autoBindDlq为true。绑定器创建一个DLQ。可选地,您可以在deadLetterQueueName中指定名称。 -
设置要等待重新传递之间的时间段,您想要的退避时间。您可以选择从 0 到 1792 秒之间的任何值。
-
将
0设置为默认交换机。从死信队列中过期的消息被路由回原队列,因为默认值1是队列名称(2)。通过不设置属性值来设置为默认交换机,如下一个示例所示。
要强制消息进入死信队列,要么抛出AmqpRejectAndDontRequeueException要么设置requeueRejected为false(默认)并抛出任何异常。
循环不间断,这对临时问题来说没问题,但你可能希望在一定次数尝试后放弃。幸运的是,RabbitMQ 提供了 0 标头,允许你确定已发生多少个周期。
在放弃后确认消息,抛出一个 ImmediateAcknowledgeAmqpException。
5.1 将所有内容整合在一起
以下配置创建了一个交换 myDestination,该交换绑定到一个主题交换,并使用通配符路由键 # 绑定到队列 myDestination.consumerGroup:
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
此配置创建了一个与直连交换机(DLX)绑定的死信队列,路由键为 myDestination.consumerGroup。当消息被拒绝时,它们会被路由到死信队列。5秒后,消息过期并使用队列名称作为路由键,将消息重新路由回原始队列,如下例所示:
@SpringBootApplication
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
Map<?,?> death = message.getHeaders().get("x-death");
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
};
}
}
请注意,x-death 头部中的 count 属性是 Long。
6. 错误通道
从版本 1.3 开始,绑定器会无条件地将每个消费者目标的异常发送到错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。
有关更多信息,请参阅“[spring-cloud-stream-overview-error-handling]”。
RabbitMQ有两种类型的发送失败:
-
返回的消息,
-
被否定确认发布者确认。
后一种情况很少见。
根据 RabbitMQ 文档,[一个 nack] 只有在负责队列的 Erlang 进程中发生内部错误时才会被传递。
如果你发布到具有 reject-publish 队列溢出行为的有限队列,也可能获得负面确认。
除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中所述)外,RabbitMQ绑定器仅在连接工厂正确配置时才会向通道发送消息,如下所示:
-
ccf.setPublisherConfirms(true); -
ccf.setPublisherReturns(true);
使用Spring Boot配置连接工厂时,设置以下属性:<br>
-
spring.rabbitmq.publisher-confirms -
spring.rabbitmq.publisher-returns
返回消息的ErrorMessage负载是一个带有以下属性的ReturnedAmqpMessageException:
-
failedMessage: 未能发送的 spring-messagingMessage<?>。 -
amqpMessage: 原生的 Spring AMQPMessage。 -
replyCode:一个整数值,表示失败的原因(例如,312-无路由)。 -
replyText: 失败的原因(例如,NO_ROUTE)。 -
exchange: 消息发布的交换机。 -
routingKey: 消息发布时使用的路由键。
另请参阅发布者确认,了解接收退回消息的替代机制。
对于负确认,负载是一个具有以下属性的NackedAmqpMessageException:
-
failedMessage: 未能发送的 spring-messagingMessage<?>。 -
nackReason: 如果有原因(您可能需要检查代理日志以获取更多信息)。
这些异常没有自动处理方式(例如发送到死信队列)。
您可以使用自己的Spring集成流程来消费这些异常。
7. Rabbit Binder 健康指标
The health indicator for Rabbit binder delegates to the one provided from Spring Boot. For more information on this, see this.
您可以使用属性 management.health.binders.enabled 在绑定器级别禁用此运行状况指标,并将其设置为 false。在多绑定器环境中,必须在绑定器环境属性上设置此属性。
当健康指标被禁用时,您应该在健康量表端点中看到如下所示的内容:
"rabbit": {
"status": "UNKNOWN"
}
在Spring Boot级别,如果你想禁用Rabbit健康指标,需要将属性设置为management.health.rabbit.enabled并设置为false。
8. 死信队列处理
由于无法预知用户希望如何处理死信消息,框架不提供任何标准机制来处理这些消息。
如果导致消息进入死信队列的原因是暂时性的,您可能希望将这些消息重新路由回原始队列。
但是,如果问题是永久性的,则可能会导致无限循环。
下面的Spring Boot应用程序显示了如何将这些消息重新路由回原始队列的例子,但在三次尝试后会将它们移动到第三个“停车场”队列。
第二个例子使用RabbitMQ延迟消息交换为重新排队的消息引入延迟。
在这个例子中,每次尝试时延迟都会增加。
这些示例使用@RabbitListener从DLQ接收消息。
在批处理过程中也可以使用RabbitTemplate.receive()。
示例假设原始目标为 so8400in,消费组为 so8400。
8.1 非分区目标
前两个示例适用于目标位置未分区的情况:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8.2. 分区目标
使用分区目标时,所有分区共用一个死信队列。我们通过消息头来确定原始队列。
8.2.1. republishToDlq=false
当 republishToDlq 是 false 时,RabbitMQ 使用包含有关原始目标信息的 x-death 标头将消息发布到死信交换/队列(DLX/DLQ),如下例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8.2.2. republishToDlq=true
当 republishToDlq 是 true 时,重新发布恢复程序会将原始交换机和路由键添加到头信息中,如下例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
9. 使用RabbitMQ绑定程序进行分区
RabbitMQ 不原生支持分区。
有时,向特定分区发送数据是有利的——例如,当您希望严格按顺序处理消息时,某个客户的全部消息应发送到同一个分区。
该 RabbitMessageChannelBinder 通过为每个分区绑定一个队列到目标交换器来提供分区。
以下 Java 和 YAML 示例显示了如何配置生产者:
@SpringBootApplication
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
|
前面示例中的配置使用了默认分区(
|
以下配置设置了一个主题交换:
以下队列结维到该 Exchange:
以下绑定将队列与交换机关联:
下面的 Java 和 YAML 示例继续了前面的例子,展示了如何配置消费者:
@SpringBootApplication
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}
}
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
0不支持动态缩放。 每个分区必须至少有一个消费者。 消费者的1用于指示哪个分区被消费。 如Cloud Foundry等平台只能有一个实例带有2。 |