此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

混合使用高级 DSL 和低级处理器 API

Kafka Streams 提供了两种 API 变体。 它有一个更高级别的 DSL 类似 API,您可以在其中链接许多功能程序员可能熟悉的各种作。 Kafka Streams 还允许访问低级处理器 API。 处理器 API 虽然非常强大,并且能够在低得多的级别上控制事物,但本质上是势在必行的。 用于 Spring Cloud Stream 的 Kafka Streams 绑定器允许您使用高级 DSL 或混合使用 DSL 和处理器 API。 混合使用这两种变体可以为您提供许多选项来控制应用程序中的各种用例。 应用程序可以使用transformprocess方法 API 调用来访问处理器 API。spring-doc.cadn.net.cn

下面是如何使用process应用程序接口。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

这是一个使用transform应用程序接口。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

processAPI 方法调用是一个终端作,而transformAPI 是非终端的,可为您提供潜在的转换KStream使用它可以继续使用 DSL 或处理器 API 进行进一步处理。spring-doc.cadn.net.cn