对于最新稳定版本,请使用spring-cloud-stream 5.0.1spring-doc.cadn.net.cn

StreamsBuilderFactoryBean 配置器

It is often required to customize the StreamsBuilderFactoryBean that creates the KafkaStreams objects. Based on the underlying support provided by Spring Kafka, the binder allows you to customize the StreamsBuilderFactoryBean. You can use the StreamsBuilderFactoryBeanConfigurer to customize the StreamsBuilderFactoryBean itself. Then, once you get access to the StreamsBuilderFactoryBean through this configurer, you can customize the corresponding KafkaStreams using KafkaStreamsCustomzier. Both of these customizers are part of the Spring for Apache Kafka project.spring-doc.cadn.net.cn

Here is an example of using the StreamsBuilderFactoryBeanConfigurer.spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上面显示的是您可以对 StreamsBuilderFactoryBean 所能做的自定义说明。您可以从 StreamsBuilderFactoryBean 调用任何可用的变异操作来定制它。此自定义器将在工厂 bean 启动前由绑定器调用。spring-doc.cadn.net.cn

一旦你获取到 StreamsBuilderFactoryBean 的访问权限,你还可以自定义底层的 KafkaStreams 对象。 以下是实现此目的的蓝图。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer 将在底层 KafkaStreams 启动前由 StreamsBuilderFactoryBeabn 调用。spring-doc.cadn.net.cn

在整个应用程序中,只能有一个StreamsBuilderFactoryBeanConfigurer。 那么,如何处理多个Kafka Streams处理器(每个都由独立的StreamsBuilderFactoryBean对象进行后备)呢?在这种情况下,如果针对这些处理器需要不同的自定义设置,应用程序需要根据应用程序ID应用一些过滤器。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

使用StreamsBuilderFactoryBeanConfigurer注册全局状态存储

我们上面提到,绑定器不提供注册全局状态存储的第一类方式作为功能。 要实现这一点,您需要使用 StreamsBuilderFactoryBeanConfigurer 的自定义程序。 这是这样做的方法。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {

        }
    };
}

在上面所示的代码中,对StreamsBuilder上的任何自定义都必须通过KafkaStreamsInfrastructureCustomizer完成。如果调用StreamsBuilderFactoryBean#getObject()来获取对StreamsBuilder对象的访问权,可能会因 bean 尚处于初始化状态而出现一些相互依赖问题。。spring-doc.cadn.net.cn

如果多个处理器,您要附加全局状态存储到右StreamsBuilder通过应用 ID 摘要筛选出其他StreamsBuilderFactoryBean对象。spring-doc.cadn.net.cn

使用StreamsBuilderFactoryBeanConfigurer注册生产异常处理程序

在错误处理部分,我们指出绑定器不提供处理生产异常的一流方式。 虽然这是事实,但您仍然可以使用StreamsBuilderFacotryBean定制器注册生产异常处理程序。请参见下文。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

再次,如果您的处理器有多个,您可能需要针对StreamsBuilderFactoryBean设置它。 您也可以使用配置属性(有关更多详细信息,请参阅下面),但如果您选择采用程序化方法,这是一项选择。spring-doc.cadn.net.cn