|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
记录序列化和反序列化
Kafka Streams binder 允许您以两种方式对记录进行序列化和反序列化。一个是 Kafka 提供的本机序列化和反序列化设施,另一个是 Spring Cloud Stream 框架的消息转换功能。让我们看看一些细节。
反序列化输入
<keysAreAlwaysDeserializedUsingNativeSerdes/>
对于值,入站处的反序列化默认情况下由Kafka原生完成。请注意,这与Kafka Streams绑定器之前版本中的默认行为有重大变化,那时反序列化是由框架完成的。
Kafka Streams binder 将尝试通过查看 java.util.function.Function|Consumer 的类型签名来推断匹配的 Serde 类型。
它匹配 Serdes 的顺序如下:
-
如果应用程序提供了类型为
Serde的 bean,并且返回类型使用实际键或值类型参数化,那么它将为此使用该Serde进行反序列化。
例如,如果您在应用程序中具有以下内容,则检测器会发现KStream的传入值类型与Serdebean 的参数化类型匹配。
它将使用它进行反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型,看看它们是否是Kafka Streams公开的其中一个类型。如果是,就使用它们。<br /> 下面是绑定程序将从Kafka Streams匹配的Serde类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果Kafka Streams提供的所有Serdes都不匹配类型,那么它将使用Spring Kafka提供的JsonSerde。在这种情况下,绑定器假设这些类型是JSON友好的。 这在有多个值对象作为输入时很有用,因为绑定器将内部推断它们为正确的Java类型。 在转而回到
JsonSerde之前,绑定器会检查Kafka Streams配置中设置的默认Serdes,看看是否有一个Serde,它可以与传入KStream的类型相匹配。
如果上述策略均不奏效,那么应用必须通过配置提供零个Serde。这有两种配置方式——绑定或默认值。
First the binder will look if a Serde is provided at the binding level. For e.g. if you have the following processor,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,您可以使用以下内容提供级别Serde的绑定:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果为输入绑定提供如上所述的Serde,则该值将具有更高优先级,绑定器将避免任何Serde推断。 |
如果要在解压缩时使用默认密钥/值Serdes,可以在绑定程序级别执行此操作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果不想使用Kafka提供的原生解码,可以依赖Spring Cloud Stream提供的消息转换功能。 由于原生解码是默认的,为了使Spring Cloud Stream对入站值对象进行反序列化,需要显式地禁用原生解码。
例如,如果你使用上面相同的BiFunction处理器,那么spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
你需要为所有输入单独禁用原生解码。否则,对于未禁用的输入仍然会应用原生解码。
默认情况下,Spring Cloud Stream 将使用 application/json 作为内容类型,并使用适当的 json 消息转换器。您可以通过使用以下属性和相应的 MessageConverter Bean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
Outbound serialization
外出序列化基本上遵循上述与 inbound 反序列化相同的规则。 与 inbound 反序列化类似,从 Spring Cloud Stream 3.0 版本之前的 binder 开始的一个重大变化是,outbound 的序列化现在由 Kafka 原生处理。 在 3.0 版本之前的 binder 中,这是由框架本身处理的。
Keys on the outbound are always serialized by Kafka using a matching Serde that is inferred by the binder. If it can’t infer the type of the key, then that needs to be specified using configuration.
值序列化器和反序列化器是使用与入站反序列化相同的规则推断出来的。
首先,它会匹配检查出站类型是否来自应用程序中提供的bean。
如果没有,则检查其是否与Kafka公开的Serde之一匹配,例如:Integer、Long、Short、Double、Float、byte[]、UUID和String。
如果这也不行,那么将回退到Spring Kafka项目提供的JsonSerde,但在那之前,请先查看默认的Serde配置以确定是否有匹配项。
请记住,所有这些操作对应用程序来说都是透明的。
如果以上都不起作用,用户必须通过配置提供所需的Serde。
假设您正在使用与上述相同的BiFunction处理器。那么,您可以按照以下方式配置出站键/值Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且未提供绑定级别的 Serdes,则 binder 将回退到JsonSerde,但会查找默认 Serdes 是否匹配。
默认的序列化/反序列化器配置方式与上文中描述的反序列化部分相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用程序使用分支功能并且具有多个输出绑定,则必须为每个绑定进行配置。再次强调,如果绑定程序能够推断出Serde类型,则不需要进行此配置。
如果您不希望使用 Kafka 提供的原生编码,而想使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认选项。
例如,如果您的处理器与上面相同,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,您需要为所有输出单独禁用原生编码。否则,对于未禁用的输出仍将应用原生编码。
当Spring Cloud Stream进行转换时,默认情况下,它将使用application/json作为内容类型,并使用适当的JSON消息转换器。
您可以使用以下属性和相应的MessageConverter Bean来使用自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用原生编码/解码时,binder 不会进行任何推断,这与使用原生序列化/反序列化的情况不同。
应用程序需要显式提供所有配置选项。
因此,通常建议对于序列化和反序列化的默认选项保持不变,并且在编写 Spring Cloud Stream Kafka Streams 应用程序时坚持使用 Kafka Streams 提供的原生反序列化。
你必须使用框架提供的消息转换功能的一个场景是,当你上游生产者正在使用特定的序列化策略时。
在这种情况下,你想使用匹配的反序列化策略,因为原生机制可能会失败。
当依赖于默认的Serde机制时,应用程序必须确保 binder 能够正确地将入站和出站映射到合适的Serde,否则事情可能会失败。
值得一提的是,上述数据序列化和反序列化的方案仅适用于处理器的边缘,即传入和传出。您的业务逻辑可能仍需要调用显式需要Serde对象的Kafka Streams API。这些仍然属于应用程序的责任,并且必须由开发人员相应地处理。