对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

StreamsBuilderFactoryBean 配置器

通常需要自定义StreamsBuilderFactoryBean这会创建KafkaStreams对象。 基于 Spring Kafka 提供的底层支持,binder 允许您自定义StreamsBuilderFactoryBean. 您可以使用StreamsBuilderFactoryBeanConfigurer自定义StreamsBuilderFactoryBean本身。 然后,一旦您访问了StreamsBuilderFactoryBean通过这个配置器,你可以自定义对应的KafkaStreamsKafkaStreamsCustomzier. 这两个定制器都是 Spring for Apache Kafka 项目的一部分。spring-doc.cadn.net.cn

下面是使用StreamsBuilderFactoryBeanConfigurer.spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上面显示为您可以执行的作以自定义StreamsBuilderFactoryBean. 您基本上可以从StreamsBuilderFactoryBean以自定义它。 此定制器将在工厂 bean 启动之前由 binder 调用。spring-doc.cadn.net.cn

一旦您访问了StreamsBuilderFactoryBean,您还可以自定义底层KafkaStreams对象。 这是这样做的蓝图。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer将被StreamsBuilderFactoryBeabn就在基础KafkaStreams开始。spring-doc.cadn.net.cn

只能有一个StreamsBuilderFactoryBeanConfigurer在整个应用程序中。 那么我们如何考虑多个 Kafka Streams 处理器,因为它们中的每个处理器都由单独备份StreamsBuilderFactoryBean对象? 在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些筛选器。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储

如上所述,绑定器不提供将全局状态存储注册为功能的第一类方法。 为此,您需要通过以下方式使用定制器StreamsBuilderFactoryBeanConfigurer. 这是如何做到这一点的。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {

        }
    };
}

StreamsBuilder必须通过KafkaStreamsInfrastructureCustomizer如上图所示。 如果StreamsBuilderFactoryBean#getObject()调用以访问StreamsBuilder对象,这可能无法作为 bean 在初始化中工作,因此会遇到一些循环依赖问题。spring-doc.cadn.net.cn

如果您有多个处理器,则需要将全局状态存储附加到右侧StreamsBuilder通过过滤掉另一个StreamsBuilderFactoryBean使用如上所述的应用程序 ID 的对象。spring-doc.cadn.net.cn

使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理程序

在错误处理部分,我们指出 binder 没有提供处理生产异常的第一类方法。 尽管如此,您仍然可以使用StreamsBuilderFactoryBeancustomizer 来注册生产异常处理程序。见下文。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

同样,如果您有多个处理器,您可能需要根据正确的处理器将其适当地设置为StreamsBuilderFactoryBean. 您也可以使用配置属性添加此类生产异常处理程序(有关更多信息,请参阅下文),但如果您选择使用编程方法,则这是一个选项。spring-doc.cadn.net.cn