编程模型
在使用 Kafka Streams 绑定器提供的编程模型时,可以使用高级 Streams DSL 和高级与低级 处理器-API 的混合作为选项。当混合使用高级和低级 API 时,通常通过调用 transform 或 process 上 KStream 的 API 方法来实现。
函数式风格
从 Spring Cloud Stream 3.0.0 开始,Kafka Streams 绑定器允许应用程序使用 Java 8 中可用的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为类型 java.util.function.Function 或 java.util.function.Consumer 的 lambda 表达式。
让我们看一个非常基础的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然简单,但这是一个完整的独立Spring Boot应用程序,利用Kafka Streams进行流处理。这是一个没有出站绑定且只有一个入站绑定的消费者应用程序。该应用程序消耗数据,并将KStream键和值的信息记录到标准输出中。该应用程序包含SpringBootApplication注解和一个标记为Bean的方法。bean方法是类型java.util.function.Consumer,其参数化为KStream。然后在实现中,我们返回了一个Consumer对象,它基本上是一个lambda表达式。在lambda表达式内部提供了处理数据的代码。
在此应用程序中,有一个单一输入绑定,其类型为KStream。
绑定器为此应用程序创建此绑定,并为其分配名称process-in-0,即函数bean名称后跟连字符(-)和字面量in,再跟另一个连字符以及参数的序数位置。
您使用此绑定名称来设置其他属性,例如目标。
例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic。
| 如果绑定上未设置目标属性,则会创建一个与绑定同名的主题(如果有足够的权限),或者期望该主题已经存在。 |
构建为一个 uber-jar 后(例如,kstream-consumer-app.jar),您可以像下面这样运行上面的例子。
如果应用程序选择使用Spring的Component注释来定义功能bean,则绑定器也支持该模型。
上述功能bean可以重写为如下所示。
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这里是另一个完整的处理器示例,它同时具有输入和输出绑定。这是经典的单词计数示例,在该示例中,应用程序从主题接收数据,并在滚动时间窗口内计算每个单词出现的次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
这里再次是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,bean 方法的类型是 java.util.function.Function。
对于 Function 的第一个参数化类型用于输入 KStream,第二个类型用于输出。
在方法体中,提供了一个 lambda 表达式,其类型为 Function,并给出了实际业务逻辑作为实现。
类似于之前讨论过的基于 Consumer 的应用程序,这里的输入绑定默认命名为 process-in-0。对于输出,绑定名称也自动设置为 process-out-0。
构建为一个可执行 jar 文件(例如,wordcount-processor.jar)后,您可以像下面这样运行上面的例子。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将从 Kafka 主题 words 中获取消息,并将计算结果发布到输出主题 counts。
Spring Cloud Stream 将确保来自入站和出站主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器所需的逻辑。设置 Kafka Streams 基础设施所需的特定配置由框架自动处理。
我们上面看到的两个示例都有一个KStream输入绑定。在这两种情况下,这些绑定都从单个主题接收记录。
如果您希望将多个主题多路复用到单个KStream绑定中,则可以在下面提供逗号分隔的Kafka主题作为目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
另外,如果您希望将主题模式作为目标位置提供以与正则表达式匹配,则可以做到。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多重输入绑定
许多非平凡的 Kafka Streams 应用程序通常通过多个绑定从多个主题中消费数据。例如,一个主题被作为 Kstream 消费,而另一个主题则被作为 KTable 或 GlobalKTable 消费。应用程序可能希望将数据以表格类型接收有许多原因。考虑一种使用场景,其中底层主题是通过数据库中的变更数据捕获(CDC)机制填充的,或者也许应用程序只关心下游处理的最新更新。如果应用程序指定数据需要绑定为KTable或GlobalKTable,那么Kafka Streams绑定器会正确地将目的地绑定到KTable或GlobalKTable,以便应用程序可以操作它们。我们将看看几种不同的场景,如何在Kafka Streams绑定器中处理多个输入绑定。
Kafka Streams绑定中的BiFunction
这里有一个包含两个输入和一个输出的示例。在这种情况下,应用程序可以利用java.util.function.BiFunction。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
再次,基本主题与前面的示例相同,但这里有两个输入。
Java 的 BiFunction 支持用于将输入绑定到所需的目标。
绑定器为输入生成的默认绑定名称分别为 process-in-0 和 process-in-1。默认输出绑定是 process-out-0。
在此示例中,BiFunction 的第一个参数被绑定为第一个输入的 KStream,第二个参数被绑定为第二个输入的 KTable。
BiConsumer 在 Kafka Streams 绑定中
如果有两个输入,但没有输出,则可以使用java.util.function.BiConsumer,如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果需要超过两个输入怎么办?<br/>在某些情况下,您可能需要使用多于两个的输入。在这种情况下,绑定器允许您链接部分函数。<br/>用函数式编程术语来说,这种技术通常被称为柯里化(currying)。<br/>随着Java 8添加了函数式编程支持,现在Java可以编写柯里化函数。<br/>Spring Cloud Stream Kafka Streams绑定器可以利用此功能来实现多个输入绑定。
让我们看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看看上面提出的绑定模型的详细信息。在此模型中,我们对传入的数据应用了三个部分函数。让我们将它们称为f(x)、f(y)和f(z)。如果我们将这些函数扩展为真正的数学函数,它们看起来会像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>。变量x代表KStream<Long, Order>,变量y代表GlobalKTable<Long, Customer>,而变量z则代表GlobalKTable<Long, Product>。第一个函数 f(x) 具有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出是该函数 f(y)。函数f(y)具有应用程序的第二个输入绑定(GlobalKTable<Long, Customer>),并且其输出是另一个函数,f(z)。应用程序的第三个输入(GlobalKTable<Long, Product>)是该函数 f(z) 的输入,其输出为 KStream<Long, EnrichedOrder>,即应用程序的最终输出绑定。三个部分函数的输入分别为KStream、GlobalKTable和GlobalKTable,您可以在方法体中使用它们来实现业务逻辑。
输入绑定分别命名为enrichOrder-in-0、enrichOrder-in-1和enrichOrder-in-2。输出绑定命名为enrichOrder-out-0。
使用柯里化函数,您实际上可以拥有任意数量的输入。但是请记住,如果超过少量输入,并且像上面在Java中那样部分应用这些函数,则可能导致代码难以阅读。<br/>因此,如果您的Kafka Streams应用程序需要比合理的小数量更多的输入绑定,并且想要使用这种功能模型,那么可能需要重新考虑您的设计并适当地分解应用程序。
输出绑定
Kafka Streams 绑定器允许输出绑定的类型为 KStream 或 KTable。
在幕后,绑定器使用 to 方法在 KStream 上将生成的记录发送到输出主题。
如果应用程序在函数中提供 KTable 作为输出,则绑定器仍然通过委托给 to 方法来使用此技术。
例如,下面两个函数都可以工作:
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
多个输出绑定
Kafka Streams 允许将传出数据写入多个主题。此功能在 Kafka Streams 中称为分支。
当使用多个输出绑定时,需要提供一个 KStream 数组(KStream[])作为传出返回类型。
这是一个示例:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
编程模型保持不变,但是出站参数化类型是KStream[]。默认输出绑定名称分别是process-out-0、process-out-1和process-out-2,对应于上面的函数。
生成三个输出绑定的原因是因为绑定器检测到返回的KStream数组长度为三。
请注意,在此示例中我们提供了一个noDefaultBranch();如果使用了defaultBranch(),则需要额外的一个输出绑定,实际上返回的是一个长度为四的KStream数组。
关于Kafka Streams函数式编程风格的总结
总之,下表显示了在函数式编程中可以使用的各种选项。
| 输入数量 | 输出数量 | 要使用的组件 |
|---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用柯里化函数 |
-
如果此表中有多个输出,则类型将简单地变为
KStream[]。
Kafka Streams绑定中的函数组合
Kafka Streams绑定支持线性拓扑的最小形式的功能组合。使用Java函数API支持,您可以编写多个函数,然后使用andThen方法自行组合它们。例如,假设您有以下两个函数。
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
即使没有绑定器中的函数组合支持,您也可以如下组合这两个函数。
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
然后您可以提供spring.cloud.function.definition=foo;bar;composed形式的定义。
借助绑定器中的函数组合支持,您无需编写这个第三个函数,在该函数中进行显式函数组合。
您可以这样做:<br>
spring.cloud.function.definition=foo|bar
您甚至可以这样做:<br/>
spring.cloud.function.definition=foo|bar;foo;bar
此示例中,组合函数的默认绑定名称变为foobar-in-0和foobar-out-0。
Kafka Streams绑定中函数组合的限制
当您有 java.util.function.Function 个可以与其他函数或多个函数组合的 Bean 时。
同一个函数 Bean 也可以与一个 java.util.function.Consumer 组合。在这种情况下,消费者是最后组合的组件。
一个函数可以与多个函数组合,然后也以一个 java.util.function.Consumer 的 Bean 结束。
在组合类型为 java.util.function.BiFunction 的 Bean 时,BiFunction 必须是定义中的第一个函数。
组合后的实体必须是 java.util.function.Function 或 java.util.function.Consumer 类型之一。
换句话说,您不能先获取一个 BiFunction 类型的 Bean 然后与另一个 BiFunction 类型进行组合。
你不能将类型为BiConsumer的元素或以Consumer作为第一个组件的定义进行组合。
同样,除非这是定义中的最后一个组件,否则你也不能对输出为数组(KStream[]表示分支)的函数进行组合。
函数定义中的第一个Function也可以使用柯里化形式。
例如,如下写法是可行的。
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
函数定义可以是 curriedFoo|bar。
幕后,绑定器将为柯里化函数创建两个输入绑定,并基于定义中的最终函数创建一个输出绑定。
在这种情况下,默认的输入绑定将是 curriedFoobar-in-0 和 curriedFoobar-in-1。
此示例的默认输出绑定变为 curriedFoobar-out-0。
使用时的特别说明KTable作为函数组合的输出
假设您有两个以下函数。
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
您可以将它们组合为foo|bar,但请记住第二个函数(在这种情况下是bar)必须有KTable作为输入,因为第一个函数(foo)输出的是KTable。