对于最新稳定版本,请使用spring-cloud-stream 5.0.1spring-doc.cadn.net.cn

按需手动启动Kafka Streams处理器

尽管上面介绍的方法将无条件地通过StreamsBuilderFactoryManager将所有Kafka Streams处理器的自动启动设置为false,但通常只有选择性地使个别Kafka Streams处理器不自动启动是 desirable 的。例如,假设您的应用程序中有三个不同的函数(处理器),并且您不想在应用启动时启动其中一个处理器。下面是一个这种情况下示例。spring-doc.cadn.net.cn

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

在上面的场景中,如果您将spring.kafka.streams.auto-startup设置为false,那么在应用程序启动期间将没有任何处理器自动启动。如果出现这种情况,您必须按照上述通过在基础start()上调用StreamsBuilderFactoryManager来编程式启动它们。但是,如果我们有一个用例需要选择性地仅禁用一个处理器,那么您必须在该处理器的单独绑定上设置auto-startup。假设我们不想让我们的process3函数自动启动。这是一个带有两个输入绑定的BiFunction-process3-in-08。为了避免此处理器自动启动,您可以选择任何这些输入绑定并将其设置为 9 。 它不关心您选择哪个绑定; 如果您愿意, 可以为它们都设置为 10 到 11, 但只需要一个。因为它们共享同一个工厂 bean,所以您不需要在两个绑定上都将 autoStartup 设置为 false,但这样做可能有意义,为了清晰起见。spring-doc.cadn.net.cn

这里是您可以用于禁用此处理器自动启动的 Spring Cloud Stream 属性。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

再来,患你可以抽助REST结构(endpoint)或者Spring Boot actuator推写(API),并自动同步运行备注解推理型。对此,患你必修确保,Spring Boot actuator双文(jar)(dependency)在classpath下.spring-doc.cadn.net.cn

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

this section 从参考文档中的更多此机制的详细信息。spring-doc.cadn.net.cn

当按照本节中禁用auto-startup所述控制绑定时,请注意,这仅适用于消费者绑定。 换句话说,如果您使用生产者绑定,process3-out-0,则在禁用处理器自动启动方面没有任何效果,尽管此生产者绑定与使用相同的StreamsBuilderFactoryBean的消费者绑定相同。