对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
Kafka Streams 应用程序中基于事件类型的路由
Kafka Streams 绑定器不支持基于常规消息通道的绑定程序中可用的路由函数。 但是,Kafka Streams 绑定器仍然通过入站记录上的事件类型记录头提供路由功能。
若要启用基于事件类型的路由,应用程序必须提供以下属性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes
.
这可以是逗号分隔的值。
例如,假设我们有这个函数:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我们还假设,如果传入记录的事件类型为foo
或bar
.
这可以使用以下方式表示eventTypes
属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
现在,当应用程序运行时,活页夹会检查每个传入记录中的标头event_type
并查看它的值是否设置为foo
或bar
.
如果找不到其中任何一个,则将跳过函数执行。
默认情况下,活页夹期望记录头键为event_type
,但可以根据绑定进行更改。
例如,如果我们想将此绑定上的标头键更改为my_event
而不是默认值,可以按如下方式更改。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event
.
在 Kafkfa Streams 绑定器中使用事件路由功能时,它使用 byte 数组Serde
反序列化所有传入记录。
如果记录头与事件类型匹配,则只有它使用实际的Serde
使用配置的或推断的Serde
.
如果在绑定上设置反序列化异常处理程序,则会引入问题,因为预期的反序列化仅发生在堆栈中,从而导致意外错误。
为了解决此问题,您可以在绑定上设置以下属性,以强制绑定器使用配置或推断的Serde
而不是字节数组Serde
.
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
这样,应用程序可以在使用事件路由功能时立即检测反序列化问题,并可以做出适当的处理决策。