此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
消耗批次
从 3.0 版本开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode
设置为true
,通过轮询 Kafka 收到的所有记录Consumer
将作为List<?>
到 listener 方法。
否则,将一次调用一条记录。
批处理的大小由 Kafka 使用者属性控制max.poll.records
,fetch.min.bytes
,fetch.max.wait.ms
;有关更多信息,请参阅 Kafka 文档。
接收批次时,允许使用以下类型签名:
List<Person>
Message<List<Person>>
在第一个选项中List<Person>
,侦听器将不会获得任何消息头。
如果第二种类型签名 (Message<List<Person>>
) ,则可以访问标头;但是,所有标头仍采用Collection
.
让我们以以下示例为例。
假设Message
包含一个包含 10 个的列表Person
对象。
这MessageHeaders
的Message
包含标头映射,其中 key 作为标头名称,value 作为列表。
此列表包含该标头的标头值,顺序与有效负载列表相同。
因此,应用程序可以从MessageHeaders
映射基于有效负载列表的迭代。
请注意,以List<Message<Person>>
在批处理模式下使用时不允许。
从版本开始4.0.2
,则活页夹在批处理模式下使用时支持 DLQ 功能。
请记住,在处于批处理模式的使用者绑定上使用 DLQ 时,从上一次轮询接收的所有记录都将传递到 DLQ 主题。
使用批处理模式时,不支持在绑定器中重试,因此maxAttempts 将被覆盖为 1。
您可以配置DefaultErrorHandler (使用ListenerContainerCustomizer ) 以实现与在 Binder 中重试类似的功能。
您也可以使用手册AckMode 并调用Ackowledgment.nack(index, sleep) 提交部分批次的偏移量并重新传递剩余记录。
有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。 |
接收时KafkaNull 对象,则接收到的列表将包含相应的 null 元素KafkaNull 对象。
这对两者来说都是如此List<Person> 和Message<List<Person>> style 类型签名。 |
在批处理模式下使用时的可观测性
批量消费记录时,不直接支持观测追踪传播功能。
这是因为 Kafka 绑定器使用的 Spring for Apache Kafka 库不支持对批处理侦听器进行跟踪;它仅支持记录侦听器。
在批处理侦听器中,接收到的记录可能来自多个主题/分区和多个生产者,其中添加跟踪信息是可选的。
由于批处理中的记录之间可能没有任何相关性,因此框架无法对跟踪它们做出任何假设,例如将它们作为单个跟踪 ID 提供等。
如果使用Message<List<String>>
,然后你可以得到一个名为kafka_batchConvertedHeaders
,其中包含一个列表,其条目数与有效负载相同。
此列表有一个Map
包含跟踪标头。
但是,应用程序需要正确迭代并开始观察。