|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
可观测性在Reactive Kafka Binder中
本节介绍如何在响应式 Kafka 绑定中启用基于 Micrometer 的可观察性。
生产者绑定
生产者绑定内置支持可观测性。 要启用它,请设置以下属性:
spring.cloud.stream.kafka.binder.enable-observation
当该属性设置为 true 时,您可以观察记录的发布。
使用 StreamBridge 和常规的 Supplier<?> beans 发布记录时也可以进行观察。
消费者绑定
在消费者侧启用可观测性比在生产者侧更为复杂。 有两条起点用于消费者绑定:
-
A topic where data is published via a producer binding
-
一个在 Spring Cloud Stream 外部产生数据的主题
在第一种情况下,应用程序理想地希望将可观测性标头传递到消费者入站。<br/>在第二种情况下,如果没有启动上游观察,则会启动新的观察。
示例:带有可观测性的函数
@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s.flatMap(record -> {
Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
observationRegistry
);
return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
.build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
在此示例中:
-
收到记录后,将创建一个观察。
-
如果有上游观察,它将是
KafkaRecordReceiverContext的一部分。 -
在上下文延迟的情况下创建了一个
Mono。 -
当调用
map操作时,上下文可以访问正确的观察。 -
操作
flatMap的结果作为Flux<Message<?>>发送回绑定。 -
出站记录将具有与输入绑定相同的可观测性标头。
示例:带有可观测性的消费者
@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
observationRegistry).observe(() -> System.out.println(record)))
.subscribe();
}
在这种情况下:
-
由于没有输出绑定,
doOnNext被用在Flux上而不是flatMap。 -
对
observe的直接调用启动了观察,并在完成后正确地关闭。