|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
在 Kafka Streams 绑定中进行绑定可视化和控制
从版本3.1.2开始,Kafka Streams绑定支持绑定可视化和控制。
唯一受支持的生命周期阶段是STOPPED和STARTED。
生命周期阶段PAUSED和RESUMED在Kafka Streams绑定中不可用。
为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
如果你更喜欢使用webflux,那么你可以包含spring-boot-starter-webflux而不是标准的web依赖。
此外,您还需要设置以下属性:
management.endpoints.web.exposure.include=bindings
为了进一步说明此功能,让我们使用以下应用程序作为指南:
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
正如我们所见,应用程序有两个Kafka Streams函数——一个是消费者,另一个是函数。
默认情况下,消费者绑定的名称为consumer-in-0。
同样地,对于该函数,输入绑定是function-in-0,输出绑定是function-out-0。
应用程序启动后,我们可以使用以下绑定端点找到有关绑定的详细信息。
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
所有三个绑定的详细信息请参见上文。
现在让我们停止 consumer-in-0 绑定。
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
此时,通过此绑定将不会接收到任何记录。
重新开始绑定。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
当单个函数上有多个绑定时,调用这些操作中的任意一个绑定都可以工作。
这是因为在同一个函数上的所有绑定都由相同的StreamsBuilderFactoryBean支持。
因此,对于上面的函数来说,无论是function-in-0还是function-out-0都会起作用。