这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 spring-doc.cadn.net.cn

状态存储

当使用高级 DSL 并执行相应调用触发状态存储时,Kafka Streams 将自动创建状态存储。spring-doc.cadn.net.cn

如果您希望将传入的KTable绑定作为命名状态存储进行实例化,可以使用以下策略。spring-doc.cadn.net.cn

假设你有以下函数。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然后通过设置下面的属性,传入的KTable数据将会在命名状态存储中进行重构。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

你可以在应用程序中定义自定义状态存储作为bean,这些存储将被绑定程序检测到,并添加到Kafka流生成器中。 特别是当使用处理器api时,您需要手动注册一个状态存储。 为此,您可以在应用程序中创建StateStore作为bean的示例。spring-doc.cadn.net.cn

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

这些状态存储可以直接被应用程序访问。spring-doc.cadn.net.cn

在引导过程中,将由绑定器处理这些bean,并传递给Streams builder对象。spring-doc.cadn.net.cn

访问状态存储:spring-doc.cadn.net.cn

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

当涉及到注册全局状态存储时,这将无法工作。 要注册全局状态存储,请参阅下面关于自定义StreamsBuilderFactoryBean的部分。spring-doc.cadn.net.cn