对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

编程模型

当使用 Kafka Streams binder 提供的编程模型时,高级 Streams DSL 和高级和低级 Processor-API 的混合都可以用作选项。 当混合更高级别和较低级别的 API 时,这通常是通过调用transformprocessAPI 方法KStream.spring-doc.cadn.net.cn

功能风格

从 Spring Cloud Stream 开始3.0.0,Kafka Streams 绑定器允许使用 Java 8 中可用的函数式编程风格来设计和开发应用程序。 这意味着应用程序可以简洁地表示为类型的 lambda 表达式java.util.function.Functionjava.util.function.Consumer.spring-doc.cadn.net.cn

让我们举一个非常基本的例子。spring-doc.cadn.net.cn

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

虽然很简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。 这是一个没有出站绑定且只有一个入站绑定的使用者应用程序。 应用程序使用数据,它只是记录来自KStream标准输出上的键和值。 该应用程序包含SpringBootApplication注释和标记为Bean. bean 方法的类型为java.util.function.Consumer其中参数化为KStream. 然后在实现中,我们返回一个 Consumer 对象,该对象本质上是一个 lambda 表达式。 在 lambda 表达式中,提供了用于处理数据的代码。spring-doc.cadn.net.cn

在此应用程序中,有一个类型的单个输入绑定KStream. 活页夹为应用程序创建此绑定,名称为process-in-0,即函数 bean name 的名称后跟破折号字符 () 和文字-in后跟另一个破折号,然后是参数的序号位置。 使用此绑定名称可以设置其他属性,例如目标。 例如spring.cloud.stream.bindings.process-in-0.destination=my-topic.spring-doc.cadn.net.cn

如果未在绑定上设置目标属性,那么将创建一个与绑定同名的主题(如果应用程序有足够的权限),或者该主题应该已经可用。

一旦构建为 uber-jar(例如kstream-consumer-app.jar),您可以像下面一样运行上面的示例。spring-doc.cadn.net.cn

如果应用程序选择使用 Spring 的Component注释,则活页夹也支持该模型。 上面的功能 bean 可以重写如下。spring-doc.cadn.net.cn

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

这是另一个示例,它是一个具有输入和输出绑定的完整处理器。 这是经典的字数计数示例,其中应用程序从主题接收数据,然后在翻转时间窗口中计算每个单词的出现次数。spring-doc.cadn.net.cn

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

同样,这是一个完整的 Spring Boot 应用程序。这里与第一个应用程序的不同之处在于 bean 方法是java.util.function.Function. 第一个参数化类型Function用于输入KStream第二个是输出。 在方法体中,提供了一个 lambda 表达式,该表达式类型为Function作为实现,给出了实际的业务逻辑。 与前面讨论的基于消费者的应用程序类似,此处的输入绑定被命名为process-in-0默认情况下。对于输出,绑定名称也会自动设置为process-out-0.spring-doc.cadn.net.cn

一旦构建为 uber-jar(例如wordcount-processor.jar),您可以像下面一样运行上面的示例。spring-doc.cadn.net.cn

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将使用 Kafka 主题中的消息words并将计算结果发布到输出 主题counts.spring-doc.cadn.net.cn

Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写逻辑 处理器中需要。设置 Kafka Streams 基础架构所需的 Kafka Streams 特定配置 由框架自动处理。spring-doc.cadn.net.cn

我们在上面看到的两个示例有一个KStream输入绑定。在这两种情况下,绑定都从单个主题接收记录。 如果要将多个主题多路复用为一个主题KStreambinding,则可以在下方提供逗号分隔的 Kafka 主题作为目标。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3spring-doc.cadn.net.cn

此外,如果要将主题与常规 Exresession 进行匹配,还可以将主题模式提供为目标。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.destination=input.*spring-doc.cadn.net.cn

多个输入绑定

许多重要的 Kafka Streams 应用程序通常通过多个绑定使用来自多个主题的数据。 例如,一个主题被消耗为Kstream另一个作为KTableGlobalKTable. 应用程序可能希望将数据作为表类型接收的原因有很多。 考虑一个用例,其中基础主题是通过数据库中的更改数据捕获 (CDC) 机制填充的,或者应用程序可能只关心下游处理的最新更新。 如果应用程序指定数据需要绑定为KTableGlobalKTable,则 Kafka Streams 绑定器会正确地将目标绑定到KTableGlobalKTable并使它们可供应用程序作。 我们将研究几种不同的场景,了解如何在 Kafka Streams 绑定器中处理多个输入绑定。spring-doc.cadn.net.cn

Kafka Streams Binder 中的 BiFunction

这是一个示例,我们有两个输入和一个输出。在这种情况下,应用程序可以利用java.util.function.BiFunction.spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

同样,基本主题与前面的示例相同,但这里我们有两个输入。 Java 的BiFunction支持用于将输入绑定到所需的目标。 绑定器为输入生成的默认绑定名称为process-in-0process-in-1分别。默认输出绑定为process-out-0. 在此示例中,第一个参数BiFunction被绑定为KStream对于第一个输入,第二个参数被绑定为KTable对于第二个输入。spring-doc.cadn.net.cn

Kafka Streams Binder 中的 BiConsumer

如果有两个输入,但没有输出,在这种情况下,我们可以使用java.util.function.BiConsumer如下图所示。spring-doc.cadn.net.cn

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}

超过两个输入

如果您有两个以上的输入怎么办? 在某些情况下,您需要两个以上的输入。在这种情况下,活页夹允许您链接部分函数。 在函数式编程术语中,这种技术通常称为柯里化。 随着 Java 8 中添加的函数式编程支持,Java 现在允许您编写柯里函数。 Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。spring-doc.cadn.net.cn

让我们看一个例子。spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

让我们看看上面介绍的绑定模型的细节。 在此模型中,我们在入站上有 3 个部分应用的函数。让我们将它们称为f(x),f(y)f(z). 如果我们在真正的数学函数意义上扩展这些函数,它将如下所示:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>. 这x变量代表KStream<Long, Order>y变量代表GlobalKTable<Long, Customer>z变量代表GlobalKTable<Long, Product>. 第一个功能f(x)具有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出是函数 f(y)。 功能f(y)具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>),其输出是另一个函数,f(z). 函数的输入f(z)是应用程序的第三个输入 (GlobalKTable<Long, Product>),其输出为KStream<Long, EnrichedOrder>这是应用程序的最终输出绑定。 来自三个部分函数的输入,即KStream,GlobalKTable,GlobalKTable分别在方法主体中可供您使用,用于将业务逻辑实现为 lambda 表达式的一部分。spring-doc.cadn.net.cn

输入绑定命名为enrichOrder-in-0,enrichOrder-in-1enrichOrder-in-2分别。输出绑定命名为enrichOrder-out-0.spring-doc.cadn.net.cn

使用柯里函数,您几乎可以拥有任意数量的输入。但是,请记住,在 Java 中,超过较少数量的输入和部分应用的函数都可能导致代码不可读。 因此,如果您的 Kafka Streams 应用程序需要的输入绑定数量超过相当少的数量,并且您想使用此函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。spring-doc.cadn.net.cn

输出绑定

Kafka Streams 绑定器允许KStreamKTable作为输出绑定。 在幕后,活页夹使用to方法KStream将生成的记录发送到输出主题。 如果应用程序提供KTable作为函数中的输出,Binder 仍然通过委托给to方法KStream.spring-doc.cadn.net.cn

例如,以下两个功能都可以工作:spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

Multiple Output Bindings

Kafka Streams allows writing outbound data into multiple topics. This feature is known as branching in Kafka Streams. When using multiple output bindings, you need to provide an array of KStream (KStream[]) 作为出站返回类型。spring-doc.cadn.net.cn

这是一个例子:spring-doc.cadn.net.cn

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

编程模型保持不变,但出站参数化类型为KStream[]. 默认输出绑定名称为process-out-0,process-out-1,process-out-2分别用于上述函数。binder 之所以生成三个输出绑定,是因为它检测了返回的KStream数组为 3。请注意,在此示例中,我们提供了一个noDefaultBranch(); 如果我们使用defaultBranch()相反,这将需要额外的输出绑定,本质上返回一个KStream长度为四的数组。spring-doc.cadn.net.cn

Kafka 流的基于函数的编程风格摘要

总之,下表显示了可在函数范式中使用的各种选项。spring-doc.cadn.net.cn

输入数 输出数量 要使用的组件

1spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

java.util.function.消费者spring-doc.cadn.net.cn

2spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

java.util.function.Bi消费者spring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

1..nspring-doc.cadn.net.cn

java.util.function.函数spring-doc.cadn.net.cn

2spring-doc.cadn.net.cn

1..nspring-doc.cadn.net.cn

java.util.function.Bi函数spring-doc.cadn.net.cn

>= 3spring-doc.cadn.net.cn

0..nspring-doc.cadn.net.cn

使用柯里化函数spring-doc.cadn.net.cn

Kafka Streams 绑定器中的函数组合

Kafka Streams binder supports minimal forms of functional composition for linear topologies. Using the Java functional API support, you can write multiple functions and then compose them on your own using the andThen method. For example, assume that you have the following two functions.spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

Even without the functional composition support in the binder, you can compose these two functions as below.spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

Then you can provide definitions of the form spring.cloud.function.definition=foo;bar;composed. With the functional composition support in the binder, you don’t need to write this third function in which you are doing explicit function composition.spring-doc.cadn.net.cn

您可以简单地执行以下作:spring-doc.cadn.net.cn

spring.cloud.function.definition=foo|bar

您甚至可以这样做:spring-doc.cadn.net.cn

spring.cloud.function.definition=foo|bar;foo;bar

在此示例中,组合函数的默认绑定名称将变为foobar-in-0foobar-out-0.spring-doc.cadn.net.cn

Kafka Streams 绑定器中功能组合的局限性

当你有java.util.function.Functionbean,可以与另一个函数或多个函数组合。相同的函数 bean 可以用java.util.function.Consumer也。 在这种情况下,消费者是最后一个组成的组件。一个函数可以由多个函数组成,然后以java.util.function.Consumer豆子也是如此。spring-doc.cadn.net.cn

在组合类型java.util.function.BiFunctionBiFunction必须是定义中的第一个函数。组合的实体必须是java.util.function.Functionjava.util.function.Consumer. 换句话说,您不能将BiFunctionbean 然后与另一个BiFunction.spring-doc.cadn.net.cn

不能使用BiConsumer或定义,其中Consumer是第一个组件。您也不能使用输出为数组 (KStream[]用于分支),除非这是定义中的最后一个组件。spring-doc.cadn.net.cn

第一个FunctionBiFunction在函数定义中也可以使用柯里形式。例如,以下情况是可能的。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函数定义可以是curriedFoo|bar. 在后台,绑定器将为柯里函数创建两个输入绑定,以及一个基于定义中最终函数的输出绑定。在这种情况下,默认输入绑定将是curriedFoobar-in-0curriedFoobar-in-1. 此示例的默认输出绑定将变为curriedFoobar-out-0.spring-doc.cadn.net.cn

使用特别注意事项KTable作为函数组合中的输出

假设您有以下两个功能。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

您可以将它们组合为foo|bar,但请记住,第二个函数 (bar在这种情况下)必须有一个KTable作为输入,因为第一个函数 (foo) 有KTable作为输出。spring-doc.cadn.net.cn