对于最新稳定版本,请使用spring-cloud-stream 5.0.1spring-doc.cadn.net.cn

可观测性

Spring cloud Stream 在 Spring Cloud Function 层级整合了这种支持,通过提供一个 ObservationFunctionAroundWrapper 抽象,它会包装函数来开箱即用地处理观察数据。spring-doc.cadn.net.cn

必需的依赖项spring-doc.cadn.net.cn

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core-micrometer</artifactId>
</dependency>

以及可用的跟踪桥之一。例如 Zipkin Bravespring-doc.cadn.net.cn

<dependency>
	<groupId>io.micrometer</groupId>
	<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>

指令式函数

ObservationFunctionAroundWrapper 包装了命令式函数,提供必要的基础架构来处理与观察注册表的交互。
每次函数调用都会发生此类交互,这意味着观察将附加到函数的每次调用(即每消息一个观察)。
换句话说,对于命令式函数,如果存在所需的依赖项,则可观察性将自动工作。
spring-doc.cadn.net.cn

响应式函数

响应式函数本质上与命令式函数不同,因此不使用 ObservationFunctionAroundWrapper 包装。spring-doc.cadn.net.cn

命令式函数 是消息处理程序函数,并在有消息时由框架调用,类似于您典型的事件处理程序,在 N 条消息的情况下,将有 N 次此类函数的调用。 这允许我们包装这样的函数来装饰它以提供额外的功能,例如 错误处理 重试 ,以及当然还有 可观测性 spring-doc.cadn.net.cn

响应式函数 是初始化函数。其作用是将用户提供的流处理代码 (Flux) 与绑定器提供的源和目标流连接起来。它仅在应用程序启动时调用一次。一旦流代码与源/目标流连接,我们就无法看到或控制实际的流处理过程,这完全由响应式 API 掌控。响应式函数还引入了一个额外的变量。鉴于该函数让您能够看到整个流链(而不仅仅是一个单独的事件),那么默认的观察单位应该是什么? 流链中的单个元素?还是一个范围内的元素?如果经过一段时间后没有消息怎么办?等等。我们想要强调的是,在使用响应式函数时,我们不能做出任何假设。(有关响应式函数和命令式函数之间差异的更多信息,请参阅 响应式函数)。spring-doc.cadn.net.cn

所以,就像重试和错误处理一样,您需要手动处理观察。spring-doc.cadn.net.cn

感谢您可以轻松地通过使用响应式API的tap操作来访问流的一个段,并提供一个ObservationRegistry实例。这样的段定义了一个观察单位,可以是流中的单个项目、范围或其他您可能想要在流中观察的内容。spring-doc.cadn.net.cn

@SpringBootApplication
public class DemoStreamApplication {

	Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);

	public static void main(String[] args) {
		Hooks.enableAutomaticContextPropagation();
		SpringApplication.run(DemoStreamApplication.class, args);
	}

	@Bean
	public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
		return flux -> flux.flatMap(item -> {
			return Mono.just(item)
                             .map(value -> value.toUpperCase())
                             .doOnNext(v -> logger.info(v))
                             .tap(Micrometer.observation(registry));
		});
	}
}

该示例以上述方式为消息处理(即命令式函数)附加了观测值,因为在这种情况下观察单位从Mono.just(..)开始,最后一个操作将1附加到订阅者。spring-doc.cadn.net.cn

如果已经有一个观察附加到订阅者,它将用于在 tap 上游为链/段创建一个子 Observation,然而正如我们已经说明的,框架默认不会将 Observation 附加到您返回的流链。spring-doc.cadn.net.cn