对于最新稳定版本,请使用spring-cloud-stream 5.0.1spring-doc.cadn.net.cn

Mixing high level DSL and low level Processor API

Kafka Streams 提供两种变体的 API。它有一个类似高级 DSL 的 API,其中可以链接各种操作,这对许多函数式程序员来说可能是熟悉的。Kafka Streams 还提供了低级处理器 API。处理器 API 尽管非常强大,能够以更低级别的控制方式处理事物,但它是命令式的。Spring Cloud Stream Kafka 流绑定器允许您使用高级 DSL 或混合使用 DSL 和处理器 API。这两种变体的混合使用为您在应用程序中提供了很多选项来控制各种用例。应用程序可以调用 transformprocess 方法调用来访问处理器 API。spring-doc.cadn.net.cn

让我们看看如何在一个Spring Cloud Stream应用程序中使用process API,组合DSL和处理器API。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

这是一个使用transformAPI的示例。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

process 方法调用是终端操作,而transform 方法是非终端,并提供了一个可能转换后的KStream,您可以使用它继续使用 DSL 或处理器 API 进行进一步处理。spring-doc.cadn.net.cn