|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
消费记录
在上面的uppercase函数中,我们消费记录为Flux<String>,然后以Flux<String>的形式产生它。
在某些情况下,你可能需要接收原始接收到的记录格式 - ReceiverRecord。
以下是这样的一个函数。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
在这个功能中,请注意,我们把记录作为0 ,然后把它生产为1 。 ReceiverRecord是在Reactor Kafka中作为Kafka特化的ConsumerRecord的基本接收记录。当使用响应式Kafka绑定器时,上面的函数将为每个传入的记录提供对ReceiverRecord类型的访问。但是,在这种情况下,您需要为 RecordMessageConverter 提供自定义实现。默认情况下,反应性Kafka绑定器使用<1>转换器,该转换器从<2>中转换有效负载和标头。因此,当你的处理器方法接收它时,有效负载已经从接收到的记录中提取出来,并像我们上面看过的第一个函数一样传递给该方法。通过在应用程序中提供自定义RecordMessageConverter实现,您可以覆盖默认行为。比如,如果你想把记录当作原始的Flux<ReceiverRecord<byte[], byte[]>>来使用,那么你可以提供以下在应用程序中的Bean定义。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然后,您需要指示框架在所需绑定中使用此转换器。下面是一个基于我们lowercase函数的例子。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0是我们的lowercase函数的输入绑定名称。
对于出站(lowercase-out-0),我们仍然使用常规MessagingMessageConverter。
在上面的toMessage实现中,我们接收原始的ConsumerRecord(ReceiverRecord,因为我们处于响应式绑定器上下文)并将其包装在Message中。
然后,这个消息载荷,即ReceiverRecord,将提供给用户方法。
如果reactiveAutoCommit是false(默认值),调用rec.receiverOffset().acknowledge()(或commit())以导致偏移量被提交;如果reactiveAutoCommit是true,流提供ConsumerRecord个s。