|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
消费批次
从版本 3.0 开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true时,轮询 Kafka Consumer接收的所有记录都将作为List<?>传递给监听器方法。否则,该方法将一个记录接一个地被调用。批处理的大小由 Kafka 消费者属性max.poll.records、fetch.min.bytes、fetch.max.wait.ms
接收批次时,允许以下类型签名:
List<Person>
Message<List<Person>>
在第一个选项中,侦听器不会收到任何消息标题。
如果使用第二个类型签名(Message<List<Person>>),则可以访问标题;但是,所有标题仍然保持为 2 的形式。
我们来看一个例子。
假设 Message 包含一个包含十个 Person 对象的列表。
Message 的 MessageHeaders 包含一个以标头名称为键、值为列表的头信息映射。
该列表中的每个元素与有效载荷列表中的对应项顺序一致。
因此,应用程序需根据对有效载荷列表的迭代正确从 MessageHeaders 映射中访问标头。
注意,在批处理模式下消费时,形式为 List<Message<Person>> 的类型签名是不允许的。
从版本4.0.2开始,当以批量模式消费时,绑定器支持死信队列(DLQ)功能。
请注意,当在处于批量模式的消费者绑定上使用DLQ时,将从之前的轮询中接收到的所有记录都会被传递到DLQ主题。
在使用批处理模式时,绑定器内不支持重试,因此maxAttempts会被覆盖为1。您可以配置一个 您还可以手动使用 有关这些技术的更多信息,请参阅Spring for Apache Kafka 文档。 |
在批量模式下接收到KafkaNull个对象时,对应于KafkaNull个对象的接收列表将包含一个null元素。这对List<Person>和Message<List<Person>>样式类型签名都适用。 |
批量模式下消费时的可观测性
当批量消费记录时,不直接支持观察跟踪传播功能。
这是因为Kafka绑定所使用的Apache Kafka库(Spring for Apache Kafka)不支持批处理监听器上的跟踪;它仅支持记录监听器。
在批处理监听器中,接收到的记录可能来自多个主题/分区和多个生产者,在这些情况下添加跟踪信息是可选的。
由于批次中的记录之间可能没有关联,框架无法对它们进行任何假设,例如提供单个跟踪ID等。
如果您使用Message<List<String>>类型的签名,则可以获取名为kafka_batchConvertedHeaders的头部,该头部包含与有效负载具有相同数量条目的列表。
此列表有一个Map,其中包含跟踪头部。
但是,应用程序需要正确迭代并启动一个观察。