对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

绑定可视化和控制

Spring Cloud Stream 支持通过 Actuator 端点以及编程方式对 Bindings 进行可视化和控制。spring-doc.cadn.net.cn

程序化方式

从 3.1 版本开始,我们公开了org.springframework.cloud.stream.binding.BindingsLifecycleController注册为 bean 并一次 注入可用于控制单个绑定的生命周期spring-doc.cadn.net.cn

例如,查看其中一个测试用例中的片段。如您所见,我们检索BindingsLifecycleController从 Spring 应用程序上下文中执行单个方法来控制echo-in-0捆绑。。spring-doc.cadn.net.cn

BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();

此外,从 4.2 版开始,您还可以访问 Consumer 和 Producer 配置属性,以便更动态地管理其值。 您可以从BindingsLifecycleController基于绑定名称。 例如。spring-doc.cadn.net.cn

KafkaConsumerProperties properties = controller.getExtensionProperties("log-in-0”);
RabbitProducerProperties properties = controller.getExtensionProperties(“log-out-0”);

getExtensionProperties(..)定义作以确保您获得正确类型的配置属性类。spring-doc.cadn.net.cn

根据您更改的属性类型,您可能需要重新启动绑定才能使其生效(前面看到的屁股)

驱动器

由于执行器和 Web 是可选的,因此您必须首先添加其中一个 Web 依赖项,并手动添加执行器依赖项。 以下示例演示如何添加 Web 框架的依赖项:spring-doc.cadn.net.cn

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下示例显示如何添加 WebFlux 框架的依赖项:spring-doc.cadn.net.cn

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

您可以按如下方式添加 Actuator 依赖项:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在 Cloud Foundry 中运行 Spring Cloud Stream 2.0 应用程序,您必须将spring-boot-starter-webspring-boot-starter-actuator到类路径。否则, 由于运行状况检查失败,应用程序将无法启动。

您还必须启用bindings执行器端点,通过设置以下属性:--management.endpoints.web.exposure.include=bindings.spring-doc.cadn.net.cn

一旦满足这些前提条件。应用程序启动时,您应该在日志中看到以下内容:spring-doc.cadn.net.cn

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要可视化当前绑定,请访问以下 URL:<host>:<port>/actuator/bindingsspring-doc.cadn.net.cn

或者,要查看单个绑定,请访问类似于以下内容的 URL 之一:<host>:<port>/actuator/bindings/<bindingName>;spring-doc.cadn.net.cn

您还可以通过发布到同一 URL 来停止、启动、暂停和恢复单个绑定,同时提供state参数为 JSON,如以下示例所示:spring-doc.cadn.net.cn

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSEDRESUMED仅当相应的 binder 及其底层技术支持它时才有效。否则,你会在日志中看到警告消息。 目前,只有 Kafka 和 [Solace](github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) 绑定器支持PAUSEDRESUMED国家。

清理敏感数据

使用绑定执行器端点时,清理任何敏感数据(例如用户凭据、有关 SSL 密钥的信息等)有时至关重要。 为了实现这一点,最终用户应用程序可以提供一个SanitizingFunction从 Spring Boot 作为应用程序中的 bean 。 这是一个示例,用于在为 Apache Kafka 的sasl.jaas.config财产。spring-doc.cadn.net.cn

@Bean
public SanitizingFunction sanitizingFunction() {
	return sanitizableData -> {
		if (sanitizableData.getKey().equals("sasl.jaas.config")) {
			return sanitizableData.withValue("data-scrambled!!");
		}
		else {
			return sanitizableData;
		}
	};
}