可观测性在Reactive Kafka Binder中

本节介绍如何在响应式 Kafka 绑定中启用基于 Micrometer 的可观察性。spring-doc.cadn.net.cn

生产者绑定

生产者绑定内置支持可观测性。 要启用它,请设置以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.enable-observation

当该属性设置为 true 时,您可以观察记录的发布。 使用 StreamBridge 和常规的 Supplier<?> beans 发布记录时也可以进行观察。spring-doc.cadn.net.cn

消费者绑定

在消费者侧启用可观测性比在生产者侧更为复杂。 有两条起点用于消费者绑定:spring-doc.cadn.net.cn

  1. A topic where data is published via a producer bindingspring-doc.cadn.net.cn

  2. 一个在 Spring Cloud Stream 外部产生数据的主题spring-doc.cadn.net.cn

在第一种情况下,应用程序理想地希望将可观测性标头传递到消费者入站。<br/>在第二种情况下,如果没有启动上游观察,则会启动新的观察。spring-doc.cadn.net.cn

示例:带有可观测性的函数

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {

	return s -> s.flatMap(record -> {
		Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
		null,
		KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
		() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
		observationRegistry
		);

		return Mono.deferContextual(contextView -> Mono.just(record)
			.map(rec -> new String(rec.value()).toLowerCase())
			.map(rec -> MessageBuilder.withPayload(rec)
				.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
				.build()))
			.doOnTerminate(receiverObservation::stop)
			.doOnError(receiverObservation::error)
			.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
    });
}

在此示例中:spring-doc.cadn.net.cn

  1. 收到记录后,将创建一个观察。spring-doc.cadn.net.cn

  2. 如果有上游观察,它将是KafkaRecordReceiverContext的一部分。spring-doc.cadn.net.cn

  3. 在上下文延迟的情况下创建了一个Monospring-doc.cadn.net.cn

  4. 当调用map操作时,上下文可以访问正确的观察。spring-doc.cadn.net.cn

  5. 操作flatMap的结果作为Flux<Message<?>>发送回绑定。spring-doc.cadn.net.cn

  6. 出站记录将具有与输入绑定相同的可观测性标头。spring-doc.cadn.net.cn

示例:带有可观测性的消费者

@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
	return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
			null,
			KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
			() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
			observationRegistry).observe(() -> System.out.println(record)))
		.subscribe();
}

在这种情况下:spring-doc.cadn.net.cn

  1. 由于没有输出绑定,doOnNext 被用在 Flux 上而不是 flatMapspring-doc.cadn.net.cn

  2. observe的直接调用启动了观察,并在完成后正确地关闭。spring-doc.cadn.net.cn