有选择地手动启动 Kafka Streams 处理器
虽然上面列出的方法将无条件应用自动启动false
通过StreamsBuilderFactoryManager
,通常希望只有单独选择的 Kafka Streams 处理器不自动启动。
例如,假设您的应用程序中有三个不同的函数(处理器),并且对于其中一个处理器,您不希望将其作为应用程序启动的一部分来启动它。
这是这种情况的一个例子。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述方案中,如果将spring.kafka.streams.auto-startup
自false
,则在应用程序启动期间,没有任何处理器将自动启动。
在这种情况下,您必须按照上述方式以编程方式启动它们,方法是调用start()
在基础上StreamsBuilderFactoryManager
.
但是,如果我们有一个用例有选择地仅禁用一个处理器,那么您必须将auto-startup
在该处理器的单个绑定上。
让我们假设我们不想要我们的process3
自动启动功能。
这是一个BiFunction
具有两个输入绑定 -process3-in-0
和process3-in-1
.
为了避免此处理器自动启动,您可以选择这些输入绑定中的任何一个,并将auto-startup
在他们身上。
您选择哪种装订并不重要;如果您愿意,您可以设置auto-startup
自false
在他们两个上,但一个就足够了。
因为它们共享同一个工厂 bean,所以您不必在两个绑定上将 autoStartup 设置为 false,但为了清楚起见,这样做可能是有意义的。
这是可用于禁用此处理器的自动启动的 Spring Cloud Stream 属性。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然后,您可以使用 REST 端点或使用BindingsEndpoint
API 如下所示。
为此,您需要确保对类路径具有 Spring Boot 执行器依赖项。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
或
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
有关此机制的更多详细信息,请参阅参考文档中的此部分。
通过禁用来控制绑定时auto-startup 如本节所述,请注意,这仅适用于使用者绑定。
换句话说,如果您使用生产者绑定,process3-out-0 ,这在禁用处理器的自动启动方面没有任何影响,尽管此生产者绑定使用相同的StreamsBuilderFactoryBean 作为消费者绑定。 |