|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
根据事件类型在 Kafka 流应用程序中进行路由
在常规消息通道绑定程序中可用的路由功能在 Kafka 流式处理程序绑定器中不受支持。但是,Kafka 流式处理程序绑定器仍然通过传入记录上的事件类型记录头提供路由能力。
要根据事件类型启用路由,应用程序必须提供以下属性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.
这是可以是一个逗号分隔值。
例如,假设我们有一个这个函数:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
让我们也假设,我们只希望在传入记录的事件类型为foo或bar时执行此函数中的业务逻辑。
可以使用绑定上的eventTypes属性来表示这一点。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
现在,当应用程序运行时,绑定器检查每个传入记录的标头event_type,看看它的值是否设置为foo或bar。如果找不到其中任何一个,则函数执行将被跳过。
默认情况下,绑定器期望记录头键为event_type,但可以根据每个绑定情况进行更改。
例如,如果我们想将此绑定的标题键更改为my_event而不是默认值,则可以按以下方式更改。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.
在使用 Kafka Streams 绑定器中的事件路由功能时,它使用字节数组 Serde 来反序列化所有传入的记录。如果记录头与事件类型匹配,则仅使用实际的 Serde 来进行适当的反序列化,该操作使用配置或推断出的 Serde。如果您在此绑定上设置反序列化异常处理器,则由于预期的反序列化只发生在堆栈中,因此可能会导致意外错误。为了解决此问题,您可以在绑定上设置以下属性以强制绑定器使用配置或推断出的 Serde 而不是字节数组 Serde。
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
这样,应用程序在使用事件路由功能时可以立即检测到反序列化问题,并可以采取适当的处理决策。