这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 spring-doc.cadn.net.cn

编程模型

在使用 Kafka Streams 绑定器提供的编程模型时,可以使用高​​级 Streams DSL 和高级与低级 处理器-API 的混合作为选项。当混合使用高级和低级 API 时,通常通过调用 transformprocessKStream 的 API 方法来实现。spring-doc.cadn.net.cn

函数式风格

从 Spring Cloud Stream 3.0.0 开始,Kafka Streams 绑定器允许应用程序使用 Java 8 中可用的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为类型 java.util.function.Functionjava.util.function.Consumer 的 lambda 表达式。spring-doc.cadn.net.cn

让我们看一个非常基础的例子。spring-doc.cadn.net.cn

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

虽然简单,但这是一个完整的独立Spring Boot应用程序,利用Kafka Streams进行流处理。这是一个没有出站绑定且只有一个入站绑定的消费者应用程序。该应用程序消耗数据,并将KStream键和值的信息记录到标准输出中。该应用程序包含SpringBootApplication注解和一个标记为Bean的方法。bean方法是类型java.util.function.Consumer,其参数化为KStream。然后在实现中,我们返回了一个Consumer对象,它基本上是一个lambda表达式。在lambda表达式内部提供了处理数据的代码。spring-doc.cadn.net.cn

在此应用程序中,有一个单一输入绑定,其类型为KStream
绑定器为此应用程序创建此绑定,并为其分配名称process-in-0,即函数bean名称后跟连字符(-)和字面量in,再跟另一个连字符以及参数的序数位置。
您使用此绑定名称来设置其他属性,例如目标。
例如,spring.cloud.stream.bindings.process-in-0.destination=my-topicspring-doc.cadn.net.cn

如果绑定上未设置目标属性,则会创建一个与绑定同名的主题(如果有足够的权限),或者期望该主题已经存在。

构建为一个 uber-jar 后(例如,kstream-consumer-app.jar),您可以像下面这样运行上面的例子。spring-doc.cadn.net.cn

如果应用程序选择使用Spring的Component注释来定义功能bean,则绑定器也支持该模型。
上述功能bean可以重写为如下所示。spring-doc.cadn.net.cn

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

这里是另一个完整的处理器示例,它同时具有输入和输出绑定。这是经典的单词计数示例,在该示例中,应用程序从主题接收数据,并在滚动时间窗口内计算每个单词出现的次数。spring-doc.cadn.net.cn

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

这里再次是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,bean 方法的类型是 java.util.function.Function
对于 Function 的第一个参数化类型用于输入 KStream,第二个类型用于输出。
在方法体中,提供了一个 lambda 表达式,其类型为 Function,并给出了实际业务逻辑作为实现。
类似于之前讨论过的基于 Consumer 的应用程序,这里的输入绑定默认命名为 process-in-0。对于输出,绑定名称也自动设置为 process-out-0spring-doc.cadn.net.cn

构建为一个可执行 jar 文件(例如,wordcount-processor.jar)后,您可以像下面这样运行上面的例子。spring-doc.cadn.net.cn

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将从 Kafka 主题 words 中获取消息,并将计算结果发布到输出主题 countsspring-doc.cadn.net.cn

Spring Cloud Stream 将确保来自入站和出站主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器所需的逻辑。设置 Kafka Streams 基础设施所需的特定配置由框架自动处理。spring-doc.cadn.net.cn

我们上面看到的两个示例都有一个KStream输入绑定。在这两种情况下,这些绑定都从单个主题接收记录。 如果您希望将多个主题多路复用到单个KStream绑定中,则可以在下面提供逗号分隔的Kafka主题作为目标。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3spring-doc.cadn.net.cn

另外,如果您希望将主题模式作为目标位置提供以与正则表达式匹配,则可以做到。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.destination=input.*spring-doc.cadn.net.cn

多重输入绑定

许多非平凡的 Kafka Streams 应用程序通常通过多个绑定从多个主题中消费数据。例如,一个主题被作为 Kstream 消费,而另一个主题则被作为 KTableGlobalKTable 消费。应用程序可能希望将数据以表格类型接收有许多原因。考虑一种使用场景,其中底层主题是通过数据库中的变更数据捕获(CDC)机制填充的,或者也许应用程序只关心下游处理的最新更新。如果应用程序指定数据需要绑定为KTableGlobalKTable,那么Kafka Streams绑定器会正确地将目的地绑定到KTableGlobalKTable,以便应用程序可以操作它们。我们将看看几种不同的场景,如何在Kafka Streams绑定器中处理多个输入绑定。spring-doc.cadn.net.cn

Kafka Streams绑定中的BiFunction

这里有一个包含两个输入和一个输出的示例。在这种情况下,应用程序可以利用java.util.function.BiFunctionspring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

再次,基本主题与前面的示例相同,但这里有两个输入。 Java 的 BiFunction 支持用于将输入绑定到所需的目标。 绑定器为输入生成的默认绑定名称分别为 process-in-0process-in-1。默认输出绑定是 process-out-0。 在此示例中,BiFunction 的第一个参数被绑定为第一个输入的 KStream,第二个参数被绑定为第二个输入的 KTablespring-doc.cadn.net.cn

BiConsumer 在 Kafka Streams 绑定中

如果有两个输入,但没有输出,则可以使用java.util.function.BiConsumer,如下所示。spring-doc.cadn.net.cn

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}

超过两个输入

如果需要超过两个输入怎么办?<br/>在某些情况下,您可能需要使用多于两个的输入。在这种情况下,绑定器允许您链接部分函数。<br/>用函数式编程术语来说,这种技术通常被称为柯里化(currying)。<br/>随着Java 8添加了函数式编程支持,现在Java可以编写柯里化函数。<br/>Spring Cloud Stream Kafka Streams绑定器可以利用此功能来实现多个输入绑定。spring-doc.cadn.net.cn

让我们看一个例子。spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

让我们看看上面提出的绑定模型的详细信息。在此模型中,我们对传入的数据应用了三个部分函数。让我们将它们称为f(x)f(y)f(z)。如果我们将这些函数扩展为真正的数学函数,它们看起来会像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>。变量x代表KStream<Long, Order>,变量y代表GlobalKTable<Long, Customer>,而变量z则代表GlobalKTable<Long, Product>。第一个函数 f(x) 具有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出是该函数 f(y)。函数f(y)具有应用程序的第二个输入绑定(GlobalKTable<Long, Customer>),并且其输出是另一个函数,f(z)。应用程序的第三个输入(GlobalKTable<Long, Product>)是该函数 f(z) 的输入,其输出为 KStream<Long, EnrichedOrder>,即应用程序的最终输出绑定。三个部分函数的输入分别为KStreamGlobalKTableGlobalKTable,您可以在方法体中使用它们来实现业务逻辑。spring-doc.cadn.net.cn

输入绑定分别命名为enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2。输出绑定命名为enrichOrder-out-0spring-doc.cadn.net.cn

使用柯里化函数,您实际上可以拥有任意数量的输入。但是请记住,如果超过少量输入,并且像上面在Java中那样部分应用这些函数,则可能导致代码难以阅读。<br/>因此,如果您的Kafka Streams应用程序需要比合理的小数量更多的输入绑定,并且想要使用这种功能模型,那么可能需要重新考虑您的设计并适当地分解应用程序。spring-doc.cadn.net.cn

输出绑定

Kafka Streams 绑定器允许输出绑定的类型为 KStreamKTable
在幕后,绑定器使用 to 方法在 KStream 上将生成的记录发送到输出主题。
如果应用程序在函数中提供 KTable 作为输出,则绑定器仍然通过委托给 to 方法来使用此技术。
spring-doc.cadn.net.cn

例如,下面两个函数都可以工作:spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

多个输出绑定

Kafka Streams 允许将传出数据写入多个主题。此功能在 Kafka Streams 中称为分支。
当使用多个输出绑定时,需要提供一个 KStream 数组(KStream[])作为传出返回类型。spring-doc.cadn.net.cn

这是一个示例:spring-doc.cadn.net.cn

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

编程模型保持不变,但是出站参数化类型是KStream[]。默认输出绑定名称分别是process-out-0process-out-1process-out-2,对应于上面的函数。
生成三个输出绑定的原因是因为绑定器检测到返回的KStream数组长度为三。
请注意,在此示例中我们提供了一个noDefaultBranch();如果使用了defaultBranch(),则需要额外的一个输出绑定,实际上返回的是一个长度为四的KStream数组。spring-doc.cadn.net.cn

关于Kafka Streams函数式编程风格的总结

总之,下表显示了在函数式编程中可以使用的各种选项。spring-doc.cadn.net.cn

输入数量 输出数量 要使用的组件

1spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

java.util.function.Consumerspring-doc.cadn.net.cn

2spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

java.util.function.BiConsumerspring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

1..nspring-doc.cadn.net.cn

java.util.function.Functionspring-doc.cadn.net.cn

2spring-doc.cadn.net.cn

1..nspring-doc.cadn.net.cn

java.util.function.BiFunctionspring-doc.cadn.net.cn

>= 3spring-doc.cadn.net.cn

0..nspring-doc.cadn.net.cn

使用柯里化函数spring-doc.cadn.net.cn

Kafka Streams绑定中的函数组合

Kafka Streams绑定支持线性拓扑的最小形式的功能组合。使用Java函数API支持,您可以编写多个函数,然后使用andThen方法自行组合它们。例如,假设您有以下两个函数。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

即使没有绑定器中的函数组合支持,您也可以如下组合这两个函数。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

然后您可以提供spring.cloud.function.definition=foo;bar;composed形式的定义。
借助绑定器中的函数组合支持,您无需编写这个第三个函数,在该函数中进行显式函数组合。spring-doc.cadn.net.cn

您可以这样做:<br>spring-doc.cadn.net.cn

spring.cloud.function.definition=foo|bar

您甚至可以这样做:<br/>spring-doc.cadn.net.cn

spring.cloud.function.definition=foo|bar;foo;bar

此示例中,组合函数的默认绑定名称变为foobar-in-0foobar-out-0spring-doc.cadn.net.cn

Kafka Streams绑定中函数组合的限制

当您有 java.util.function.Function 个可以与其他函数或多个函数组合的 Bean 时。 同一个函数 Bean 也可以与一个 java.util.function.Consumer 组合。在这种情况下,消费者是最后组合的组件。 一个函数可以与多个函数组合,然后也以一个 java.util.function.Consumer 的 Bean 结束。spring-doc.cadn.net.cn

在组合类型为 java.util.function.BiFunction 的 Bean 时,BiFunction 必须是定义中的第一个函数。
组合后的实体必须是 java.util.function.Functionjava.util.function.Consumer 类型之一。
换句话说,您不能先获取一个 BiFunction 类型的 Bean 然后与另一个 BiFunction 类型进行组合。spring-doc.cadn.net.cn

你不能将类型为BiConsumer的元素或以Consumer作为第一个组件的定义进行组合。
同样,除非这是定义中的最后一个组件,否则你也不能对输出为数组(KStream[]表示分支)的函数进行组合。spring-doc.cadn.net.cn

函数定义中的第一个Function也可以使用柯里化形式。
例如,如下写法是可行的。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函数定义可以是 curriedFoo|bar。 幕后,绑定器将为柯里化函数创建两个输入绑定,并基于定义中的最终函数创建一个输出绑定。 在这种情况下,默认的输入绑定将是 curriedFoobar-in-0curriedFoobar-in-1。 此示例的默认输出绑定变为 curriedFoobar-out-0spring-doc.cadn.net.cn

使用时的特别说明KTable作为函数组合的输出

假设您有两个以下函数。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

您可以将它们组合为foo|bar,但请记住第二个函数(在这种情况下是bar)必须有KTable作为输入,因为第一个函数(foo)输出的是KTablespring-doc.cadn.net.cn