对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
可观察性
Spring 通过 Micrometer 提供对可观测性的支持,它定义了一个观察概念,可以在应用程序中同时启用指标和跟踪。
所需的依赖项
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core-micrometer</artifactId>
</dependency>
以及可用的示踪桥之一。例如,Zipkin Brave
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
命令式函数
命令式函数用观察包装器包装ObservationFunctionAroundWrapper
它提供了必要的基础设施来处理与观察注册表的交互。
这种交互发生在函数的每次调用中,这实际上意味着观察附加到
函数(即,每条消息的单个观察)。
换句话说,对于命令式函数,如果存在前面提到的必需依赖项,则可观测性将起作用。
反应函数
响应式函数本质上与命令式函数不同,因此没有用ObservationFunctionAroundWrapper
.
命令式函数是一个消息处理程序函数,每次有消息时框架都会调用,有点像典型的事件处理程序,其中对于 N 条消息,将有 N 次调用此类函数。这允许我们包装这样的函数,用额外的功能来装饰它,例如错误处理、重试,当然还有可观察性。
响应式函数是初始化函数。它的工作是将用户提供的流处理代码 (Flux) 与绑定器提供的源流和目标流连接起来。在应用程序启动期间,它仅被调用一次。一旦流代码与源/目标流连接,我们就无法看到或控制实际的流处理。它掌握在响应式 API 手中。反应函数还带来了一个额外的变量。鉴于该函数使您可以了解整个流链(而不仅仅是单个事件),默认的观察单位应该是什么? 流链中的单个项目?一系列物品?如果一段时间后没有消息怎么办?等。。.我们想要强调的是,对于响应式函数,我们不能假设任何事情。(有关响应式函数和命令式函数之间差异的更多信息,请参阅响应式函数)。
因此,就像重试和错误处理一样,您需要手动处理观察。
值得庆幸的是,您可以通过使用tap
响应式 API 的作,同时提供ObservationRegistry
.这样的段定义了一个观察单位,它可以是通量中的单个项目或一个范围,或者你可能想在流中观察到的任何其他内容。
@SpringBootApplication
public class DemoStreamApplication {
Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);
public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation();
SpringApplication.run(DemoStreamApplication.class, args);
}
@Bean
public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
return flux -> flux.flatMap(item -> {
return Mono.just(item)
.map(value -> value.toUpperCase())
.doOnNext(v -> logger.info(v))
.tap(Micrometer.observation(registry));
});
}
}
上面的示例模拟将 Observation 附加到单个消息处理(即命令式函数),因为在这种情况下,观察单位以 Mono.just(..) 开头,最后一个作附加了ObservationRegistry
给订阅者。
如果订阅者已经附加了观察,它将用于为 上游的链/段创建子观察tap
,但是,正如我们已经说过的,默认情况下,框架不会将任何 Observation 附加到您返回的流链。