此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

消耗批次

从 3.0 版本开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,通过轮询 Kafka 收到的所有记录Consumer将作为List<?>到 listener 方法。 否则,将一次调用一条记录。 批处理的大小由 Kafka 使用者属性控制max.poll.records,fetch.min.bytes,fetch.max.wait.ms;有关更多信息,请参阅 Kafka 文档。spring-doc.cadn.net.cn

接收批次时,允许使用以下类型签名:spring-doc.cadn.net.cn

List<Person>
Message<List<Person>>

在第一个选项中List<Person>,侦听器将不会获得任何消息头。 如果第二种类型签名 (Message<List<Person>>) ,则可以访问标头;但是,所有标头仍采用Collection. 让我们以以下示例为例。spring-doc.cadn.net.cn

假设Message包含一个包含 10 个的列表Person对象。 这MessageHeadersMessage包含标头映射,其中 key 作为标头名称,value 作为列表。 此列表包含该标头的标头值,顺序与有效负载列表相同。 因此,应用程序可以从MessageHeaders映射基于有效负载列表的迭代。spring-doc.cadn.net.cn

请注意,以List<Message<Person>>在批处理模式下使用时不允许。spring-doc.cadn.net.cn

从版本开始4.0.2,则活页夹在批处理模式下使用时支持 DLQ 功能。 请记住,在处于批处理模式的使用者绑定上使用 DLQ 时,从上一次轮询接收的所有记录都将传递到 DLQ 主题。spring-doc.cadn.net.cn

使用批处理模式时,不支持在绑定器中重试,因此maxAttempts将被覆盖为 1。 您可以配置DefaultErrorHandler(使用ListenerContainerCustomizer) 以实现与在 Binder 中重试类似的功能。 您也可以使用手册AckMode并调用Ackowledgment.nack(index, sleep)提交部分批次的偏移量并重新传递剩余记录。 有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档
接收时KafkaNull对象,则接收到的列表将包含相应的 null 元素KafkaNull对象。 这对两者来说都是如此List<Person>Message<List<Person>>style 类型签名。

在批处理模式下使用时的可观测性

批量消费记录时,不直接支持观测追踪传播功能。 这是因为 Kafka 绑定器使用的 Spring for Apache Kafka 库不支持对批处理侦听器进行跟踪;它仅支持记录侦听器。 在批处理侦听器中,接收到的记录可能来自多个主题/分区和多个生产者,其中添加跟踪信息是可选的。 由于批处理中的记录之间可能没有任何相关性,因此框架无法对跟踪它们做出任何假设,例如将它们作为单个跟踪 ID 提供等。 如果使用Message<List<String>>,然后你可以得到一个名为kafka_batchConvertedHeaders,其中包含一个列表,其条目数与有效负载相同。 此列表有一个Map包含跟踪标头。 但是,应用程序需要正确迭代并开始观察。spring-doc.cadn.net.cn