This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.1!spring-doc.cn

Binding visualization and control

Spring Cloud Stream supports visualization and control of the Bindings through Actuator endpoints as well as programmatic way.spring-doc.cn

Programmatic way

Since version 3.1 we expose org.springframework.cloud.stream.binding.BindingsLifecycleController which is registered as bean and once injected could be used to control the lifecycle of individual bindingsspring-doc.cn

For example, looks at the fragment from one of the test cases. As you can see we retrieve BindingsLifecycleController from spring application context and execute individual methods to control the lifecycle of echo-in-0 binding..spring-doc.cn

BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();

Define new and manage existing bindings

Additionally, starting version 4.2 using BindingsLifecycleController you can define new bindings as well as modify existing bindings configuration by accessing its Consumer and Producer configuration properties for more dynamic management of its values.spring-doc.cn

For example.spring-doc.cn

To define new input binding you can call BindingsLifecycleController.defineInputBinding(..) method (see below). There is equivalent of defineOutputBinding(..) method.spring-doc.cn

BindingsLifecycleController controller = context.getBean(BindingsLifecycleController.class);
KafkaConsumerProperties consumerProperties = controller.defineInputBinding("test-input-binding");

You can than manage its properties by calling getExtensionProperties(..) method.spring-doc.cn

KafkaConsumerProperties properties = controller.getExtensionProperties("test-input-binding”);
Unlike binding names derived from function definition, explicitly defined bindings do not carry the in-0/out-0 suffix since they are not backed by an actual function.

The getExtensionProperties(..) operation is defined to ensure you get the proper type of the configuration properties class, so depending on the binder and binding used you can safely cast your extension properties to appropriate type. In our case it is KafkaConsumerProperties properties.spring-doc.cn

Depending on the type of property you change, you may need to re-start the binding for it to take effect (ass seen earlier)

Actuator

Since actuator and web are optional, you must first add one of the web dependencies as well as add the actuator dependency manually. The following example shows how to add the dependency for the Web framework:spring-doc.cn

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

The following example shows how to add the dependency for the WebFlux framework:spring-doc.cn

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

You can add the Actuator dependency as follows:spring-doc.cn

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
To run Spring Cloud Stream 2.0 apps in Cloud Foundry, you must add spring-boot-starter-web and spring-boot-starter-actuator to the classpath. Otherwise, the application will not start due to health check failures.

You must also enable the bindings actuator endpoints by setting the following property: --management.endpoints.web.exposure.include=bindings.spring-doc.cn

Once those prerequisites are satisfied. you should see the following in the logs when application start:spring-doc.cn

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

To visualize the current bindings, access the following URL: <host>:<port>/actuator/bindingsspring-doc.cn

Alternative, to see a single binding, access one of the URLs similar to the following: <host>:<port>/actuator/bindings/<bindingName>;spring-doc.cn

You can also stop, start, pause, and resume individual bindings by posting to the same URL while providing a state argument as JSON, as shown in the following examples:spring-doc.cn

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSED and RESUMED work only when the corresponding binder and its underlying technology supports it. Otherwise, you see the warning message in the logs. Currently, only Kafka and [Solace](github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) binders supports the PAUSED and RESUMED states.

Sanitize Sensitive Data

When using the binding actuator endpoint, it is sometimes critical to sanitize any sensitive data such as user credentials, information about SSL keys, etc. To achieve this, end user applications can provide a SanitizingFunction from Spring Boot as a bean in the application. Here is an example to scramble the data when providing a value for Apache Kafka’s sasl.jaas.config property.spring-doc.cn

@Bean
public SanitizingFunction sanitizingFunction() {
	return sanitizableData -> {
		if (sanitizableData.getKey().equals("sasl.jaas.config")) {
			return sanitizableData.withValue("data-scrambled!!");
		}
		else {
			return sanitizableData;
		}
	};
}