对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
记录序列化和反序列化
Kafka Streams 绑定器允许您以两种方式序列化和反序列化记录。 一个是Kafka提供的原生序列化和反序列化工具,另一个是Spring Cloud Stream框架的消息转换能力。 让我们看看一些细节。
入站反序列化
密钥始终使用本机 Serdes 进行反序列化。
对于值,默认情况下,入站的反序列化由 Kafka 本机执行。 请注意,这是对以前版本的 Kafka Streams 绑定程序的默认行为的重大更改,其中反序列化是由框架完成的。
Kafka Streams 绑定器将尝试推断匹配Serde
类型通过查看java.util.function.Function|Consumer
.
这是它与 Serdes 匹配的顺序。
-
如果应用程序提供类型为
Serde
如果返回类型使用传入键或值类型的实际类型进行参数化,则它将使用Serde
用于入站反序列化。 例如,如果应用程序中有以下内容,则活页夹会检测到KStream
与在Serde
豆。 它将用于入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型,看看它们是否是 Kafka Streams 公开的类型之一。如果是这样,请使用它们。 以下是绑定器将尝试从 Kafka Streams 匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假定类型是 JSON 友好的。 如果您有多个值对象作为输入,这很有用,因为绑定器会在内部将它们推断为正确的 Java 类型。 在回退到
JsonSerde
不过,活页夹会以默认值Serde
s 设置,以查看它是否是Serde
它可以与传入的 KStream 类型匹配。
如果上述策略均无效,则应用程序必须提供Serde
通过配置。
这可以通过两种方式进行配置 - 绑定或默认。
首先,活页夹将查看Serde
在绑定级别提供。
例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,您可以提供绑定级别Serde
使用以下内容:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您提供Serde 作为每个输入绑定的 abover,那么这将具有更高的优先级,并且绑定器将远离任何Serde 推理。 |
如果您希望将默认键/值 Serdes 用于入站反序列化,则可以在 Binder 级别执行此作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你不想要 Kafka 提供的原生解码,可以依赖 Spring Cloud Stream 提供的消息转换功能。 由于原生解码是默认的,为了让 Spring Cloud Stream 反序列化入站值对象,您需要显式禁用原生解码。
例如,如果您拥有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要单独禁用所有输入的本机解码。否则,本机解码仍将应用于您未禁用的那些。
默认情况下,Spring Cloud Stream 将使用application/json
作为内容类型,并使用适当的 JSON 消息转换器。
可以使用以下属性和适当的MessageConverter
豆。
spring.cloud.stream.bindings.process-in-0.contentType
出站序列化
出站序列化几乎遵循与上述入站反序列化相同的规则。 与入站反序列化一样,与以前版本的 Spring Cloud Stream 相比,一个主要变化是出站的序列化由 Kafka 本机处理。 在 3.0 版本的 binder 之前,这是由框架本身完成的。
出站上的密钥始终由 Kafka 使用匹配Serde
这是由活页夹推断出来的。
如果它无法推断出键的类型,则需要使用配置来指定。
值 serde 使用用于入站反序列化的相同规则进行推断。
首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。
如果没有,它会检查它是否与Serde
被卡夫卡暴露,例如 -Integer
,Long
,Short
,Double
,Float
,byte[]
,UUID
和String
.
如果这不起作用,那么它就会回退到JsonSerde
由 Spring Kafka 项目提供,但先看默认的Serde
配置以查看是否存在匹配项。
请记住,所有这些都对应用程序透明地发生。
如果这些都不起作用,则用户必须提供Serde
按配置使用。
假设您正在使用相同的BiFunction
处理器,如上所述。然后,您可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推理失败,并且没有提供绑定级别 Serdes,则绑定器将回退到JsonSerde
,但请查看默认的 Serdes 以进行匹配。
默认 serde 的配置方式与上述反序列化下描述的相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果应用程序使用分支功能并具有多个输出绑定,则必须按绑定配置这些绑定。
同样,如果活页夹能够推断出Serde
类型,则无需执行此配置。
如果您不希望 Kafka 提供的本机编码,但想使用框架提供的消息转换,那么您需要显式禁用本机编码,因为本机编码是默认的。
例如,如果您拥有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,您需要单独禁用所有输出的本机编码。否则,本机编码仍将应用于您未禁用的那些。
当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json
作为内容类型,并使用适当的 JSON 消息转换器。
可以使用以下属性和相应的MessageConverter
豆。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用本机编码/解码时,binder 将不会像本机 Serdes 那样进行任何推理。
应用程序需要显式提供所有配置选项。
因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用程序时使用反序列化的默认选项,并坚持使用 Kafka Streams 提供的本机反序列化。
您必须使用框架提供的消息转换功能的一个场景是,当您的上游生产者使用特定的序列化策略时。
在这种情况下,您需要使用匹配的反序列化策略,因为本机机制可能会失败。
当依赖默认值时Serde
机制,应用程序必须确保活页夹有一条前进的道路,以正确的方式正确映射入站和出站Serde
,否则事情可能会失败。
值得一提的是,上面概述的数据反序列化方法仅适用于处理器的边缘,即入站和出站。
您的业务逻辑可能仍需要调用显式需要的 Kafka Streams APISerde
对象。
这些仍然是应用程序的责任,必须由开发人员相应地处理。