接收批量消息
使用 RabbitMQ 绑定程序时,消费者绑定会处理两种类型的批次:
由生产者创建的批次
通常,如果生产者绑定具有 batch-enabled=true(参见Rabbit Producer Properties),或者消息由BatchingRabbitTemplate创建,则批次中的元素将作为对监听器方法的单独调用返回。从版本3.0开始,如果将spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,则可以将此类批处理呈现给监听器方法。List<?>。
消费者端批处理
从版本 3.1 开始,可以配置消费者将多个传入消息组合成一个批次,该批次将以转换后的有效负载 List<?> 的形式呈现给应用程序。
以下简单应用程序演示了如何使用此技术:
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<List<Thing>> input() {
return list -> {
System.out.println("Received " + list.size());
list.forEach(thing -> {
System.out.println(thing);
// ...
});
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1]
Thing [field=value2]
批处理中的消息数量由batch-size和receive-timeout属性指定;如果在receive-timeout时间内没有新的消息到达,则会交付一个“短”批次。
消费端批量处理仅在container-type=simple(默认)时支持。 |
如果您希望检查消费者端批处理消息的标题,您应该使用Message<List<?>>;标题是List<Map<String, Object>>的一种,在一个标题AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS中,每个有效载荷元素的标题在相应索引处。
再次强调,这里有一个简单例子:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<Message<List<Thing>>> input() {
return msg -> {
List<Thing> things = msg.getPayload();
System.out.println("Received " + things.size());
List<Map<String, Object>> headers =
(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
for (int i = 0; i < things.size(); i++) {
System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));
// ...
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue1");
return msg;
});
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue2");
return msg;
});
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getfield() {
return this.field;
}
public void setfield(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2