这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 spring-doc.cadn.net.cn

Spring框架的绑定可视化和控制(官方教程)

Spring Cloud Stream支持通过Actuator端点以及程序化方式对绑定进行可视化和控制。spring-doc.cadn.net.cn

通过编程方式

自版本 3.1 我们公开了org.springframework.cloud.stream.binding.BindingsLifecycleController它被注册为 bean 并且一旦注入就可以用来控制单个绑定的生命周期spring-doc.cadn.net.cn

例如,查看其中一个测试用例的片段。如您所见,我们从spring应用上下文检索BindingsLifecycleController,并执行各个方法来控制echo-in-0绑定的生命周期。。spring-doc.cadn.net.cn

BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();

定义新绑定并管理现有绑定

此外,从4.2版本开始,使用BindingsLifecycleController,您可以通过访问其Consumer和Producer配置属性来定义新的绑定以及修改现有的绑定配置,以实现对其值的更动态管理。spring-doc.cadn.net.cn

要定义新的输入绑定,可以调用BindingsLifecycleController.defineInputBinding(..)方法(见下面)。有等效的defineOutputBinding(..)方法。spring-doc.cadn.net.cn

BindingsLifecycleController controller = context.getBean(BindingsLifecycleController.class);
KafkaConsumerProperties consumerProperties = controller.defineInputBinding("test-input-binding");

您可以调用 getExtensionProperties(..) 方法来管理它的属性。spring-doc.cadn.net.cn

KafkaConsumerProperties properties = controller.getExtensionProperties("test-input-binding”);
与从函数定义派生的绑定名称不同,显式定义的绑定不带in-0/out-0后缀,因为它们不是由实际函数 backing 的。

getExtensionProperties(..) 操作用于确保获取配置属性类的适当类型,具体取决于绑定器和使用的绑定,您可以安全地将扩展属性强制转换为适当的类型。在我们的示例中,它是KafkaConsumerProperties属性。spring-doc.cadn.net.cn

依类型的不同,您可能需要重新启动绑定才能使更改生效(如上所述)

执行器

由于启动程序和 Web 是可选的,因此您必须首先手动添加 Web 依赖项之一以及启动程序依赖项。下面的例子展示了如何为 Web 框架添加依赖项:spring-doc.cadn.net.cn

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下示例显示如何添加 WebFlux 框架的依赖项:spring-doc.cadn.net.cn

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

您可以按如下方式添加Actuator依赖项:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在 Cloud Foundry 中运行 Spring Cloud Stream 2.0 应用程序,您必须将 0 和 1 添加到类路径。否则,由于健康检查失败,应用程序将无法启动。

还必须通过设置以下属性来启用bindings执行器端点: --management.endpoints.web.exposure.include=bindingsspring-doc.cadn.net.cn

一旦满足这些前提条件,您应该在启动应用程序时在日志中看到以下内容:spring-doc.cadn.net.cn

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要查看当前绑定,请访问以下 URL: <host>:<port>/actuator/bindingsspring-doc.cadn.net.cn

Alternative, to see a single binding, access one of the URLs similar to the following: <host>:<port>/actuator/bindings/<bindingName>;spring-doc.cadn.net.cn

您还可以通过在同一个URL上发布state参数作为JSON来停止、启动、暂停和继续单独的绑定,如以下示例所示:spring-doc.cadn.net.cn

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSEDRESUMED 仅在对应的 binder 以及其底层技术支持时生效,否则会在日志中看到警告信息。 目前,仅 Kafka 和 [Solace](github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pause resume) binders 支持 PAUSEDRESUMED 状态。

清理敏感数据

当使用绑定执行器端点时,有时需要对任何敏感数据进行消毒,例如用户凭据、SSL 密钥信息等。要实现这一点,用户可以向应用程序提供一个 0(由 Spring Boot 提供)作为 Bean。这是提供 Apache Kafka 的 1 值时混淆数据的示例。spring-doc.cadn.net.cn

@Bean
public SanitizingFunction sanitizingFunction() {
	return sanitizableData -> {
		if (sanitizableData.getKey().equals("sasl.jaas.config")) {
			return sanitizableData.withValue("data-scrambled!!");
		}
		else {
			return sanitizableData;
		}
	};
}