这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 spring-doc.cadn.net.cn

时间戳提取器

Kafka 流允许您根据各种时间戳概念控制对使用者记录的处理。默认情况下,Kafka 流从使用者记录中提取嵌入式时间戳元数据。您可以通过为每个输入绑定提供不同的TimestampExtractor实现来更改此默认行为。下面是一些关于如何做到这一点的详细信息。spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然后你为每个消费者绑定设置上面的TimestampExtractorbean名字。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果跳过输入消费者绑定以设置自定义时间戳提取器,该消费者将使用默认设置。spring-doc.cadn.net.cn