这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 spring-doc.cadn.net.cn

消费记录

在上面的upppercase函数中,我们消费记录为Flux<String>,然后以Flux<String>的形式产生它。 在某些情况下,你可能需要接收原始接收到的记录格式 - ReceiverRecord。 以下是这样的一个函数。spring-doc.cadn.net.cn

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

在这个功能中,请注意,我们把记录作为0 ,然后把它生产为1 ReceiverRecord是在Reactor Kafka中作为Kafka特化的ConsumerRecord的基本接收记录。当使用响应式Kafka绑定器时,上面的函数将为每个传入的记录提供对ReceiverRecord类型的访问。但是,在这种情况下,您需要为 RecordMessageConverter 提供自定义实现。默认情况下,反应性Kafka绑定器使用<1>转换器,该转换器从<2>中转换有效负载和标头。因此,当你的处理器方法接收它时,有效负载已经从接收到的记录中提取出来,并像我们上面看过的第一个函数一样传递给该方法。通过在应用程序中提供自定义RecordMessageConverter实现,您可以覆盖默认行为。比如,如果你想把记录当作原始的Flux<ReceiverRecord<byte[], byte[]>>来使用,那么你可以提供以下在应用程序中的Bean定义。spring-doc.cadn.net.cn

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

然后,您需要指示框架在所需绑定中使用此转换器。下面是一个基于我们lowercase函数的例子。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0是我们的lowercase函数的输入绑定名称。 对于出站(lowercase-out-0),我们仍然使用常规MessagingMessageConverterspring-doc.cadn.net.cn

在上面的toMessage实现中,我们接收原始的ConsumerRecord(ReceiverRecord,因为我们处于响应式绑定器上下文)并将其包装在Message中。 然后,这个消息载荷,即ReceiverRecord,将提供给用户方法。spring-doc.cadn.net.cn

如果reactiveAutoCommitfalse(默认值),调用rec.receiverOffset().acknowledge()(或commit())以导致偏移量被提交;如果reactiveAutoCommittrue,流提供ConsumerRecord个s。spring-doc.cadn.net.cn