使用记录

在上面uppercase函数,我们将记录用作Flux<String>然后将其生成为Flux<String>. 在某些情况下,您可能需要以原始接收格式接收记录 -ReceiverRecord. 这是这样一个功能。spring-doc.cadn.net.cn

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

请注意,在此函数中,我们将记录用作Flux<ReceiverRecord<byte[], byte[]>>然后将其生成为Flux<String>.ReceiverRecord是基本接收记录,它是专门的 KafkaConsumerRecord在卡夫卡反应堆中。 当使用响应式 Kafka 绑定器时,上述函数将允许您访问ReceiverRecord类型。 但是,在这种情况下,您需要为 RecordMessageConverter 提供自定义实现。 默认情况下,响应式 Kafka 绑定器使用 MessagingMessageConverter 将有效负载和标头从ConsumerRecord. 因此,当您的处理程序方法收到它时,有效负载已经从接收到的记录中提取并传递到该方法中,就像我们上面看到的第一个函数一样。 通过提供自定义RecordMessageConverter实现,您可以覆盖默认行为。 例如,如果要将记录作为原始记录使用Flux<ReceiverRecord<byte[], byte[]>>,则可以在应用程序中提供以下 bean 定义。spring-doc.cadn.net.cn

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

然后,您需要指示框架使用此转换器进行所需的绑定。 这是一个基于我们的lowercase功能。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0是我们的lowercase功能。 对于出站(lowercase-out-0),我们仍然使用常规MessagingMessageConverter.spring-doc.cadn.net.cn

toMessage实现上述,我们收到原始的ConsumerRecord (ReceiverRecord因为我们处于响应式 Binder 上下文中),然后将其包装在Message. 然后,该消息有效负载是ReceiverRecord提供给用户方法。spring-doc.cadn.net.cn

如果reactiveAutoCommitfalse(默认),调用rec.receiverOffset().acknowledge()(或commit()) 导致偏移量被提交;如果reactiveAutoCommittrue,助焊剂供应ConsumerRecords 代替。 请参阅reactor-kafka文档和 JavaDocs 了解更多信息。spring-doc.cadn.net.cn