对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
Spring Cloud Stream Schema 注册表
介绍
当组织具有基于消息传递的发布/订阅架构并且多个生产者和消费者微服务相互通信时,所有这些微服务通常需要就基于架构的契约达成一致。 当此类模式需要演变以适应新的业务需求时,仍需要现有组件继续工作。 Spring Cloud Stream 支持独立的模式注册表服务器,应用程序可以使用该服务器注册和使用上述模式。 Spring Cloud Stream 模式注册表支持还提供了对基于 avro 的模式注册表客户端的支持,这些客户端本质上提供了与模式注册表通信的消息转换器,以便在消息转换期间协调模式。 Spring Cloud Stream 提供的模式演进支持既适用于上述独立模式注册表,也适用于 Confluent 提供的专门与 Apache Kafka 配合使用的模式注册表。
Spring Cloud Stream Schema Registry 概述
Spring Cloud Stream Schema Registry 提供对模式演变的支持,以便数据可以随着时间的推移而演变,并且仍然可以与较旧或较新的生产者和消费者一起使用,反之亦然。 大多数序列化模型,尤其是那些旨在跨不同平台和语言可移植的模型,都依赖于描述如何在二进制有效负载中序列化数据的模式。 为了序列化数据,然后解释数据,发送端和接收端都必须有权访问描述二进制格式的架构。 在某些情况下,可以从序列化时的有效负载类型或反序列化时的目标类型推断出架构。 但是,许多应用程序受益于访问描述二进制数据格式的显式模式。 架构注册表允许您以文本格式(通常是 JSON)存储架构信息,并使需要它以二进制格式接收和发送数据的各种应用程序可以访问该信息。 模式可作为元组引用,该元组由以下部分组成:
-
主题,即架构的逻辑名称
-
架构版本
-
架构格式,描述数据的二进制格式
Spring Cloud Stream Schema Registry 提供以下组件
-
独立架构注册表服务器
By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
-
能够通过与架构注册表通信来进行消息封送的架构注册表客户端。
Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.
架构注册表客户端
与架构注册表服务器交互的客户端抽象是SchemaRegistryClient
接口,其结构如下:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 提供了开箱即用的实现,用于与自己的模式服务器交互以及与 Confluent Schema Registry 交互。
可以使用@EnableSchemaRegistryClient
如下:
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
默认转换器经过优化,不仅可以缓存来自远程服务器的模式,还可以缓存parse() 和toString() 方法,相当昂贵。
因此,它使用DefaultSchemaRegistryClient 这不会缓存响应。
如果打算更改默认行为,可以直接在代码上使用客户端并将其覆盖为所需的结果。
为此,您必须添加属性spring.cloud.stream.schemaRegistryClient.cached=true 到您的应用程序属性。 |
架构注册表客户端属性
Schema Registry Client 支持以下属性:
spring.cloud.stream.schemaRegistryClient.endpoint
-
模式服务器的位置。 设置此设置时,请使用完整的 URL,包括协议 (
http
或https
) 、端口和上下文路径。 - 默认值
spring.cloud.stream.schemaRegistryClient.cached
-
客户端是否应缓存架构服务器响应。 通常设置为
false
,因为缓存发生在消息转换器中。 使用架构注册表客户端的客户端应将此设置为true
. - 默认值
-
false
Avro Schema Registry 客户端消息转换器
对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 会自动配置 Apache Avro 消息转换器以进行模式管理。 这简化了架构演变,因为接收消息的应用程序可以轻松访问可以与自己的读取器架构协调的编写器架构。
对于出站邮件,如果绑定的内容类型设置为application/*+avro
这MessageConverter
已激活,如以下示例所示:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
在出站转换期间,消息转换器尝试推断每个出站消息的架构(基于其类型),并使用SchemaRegistryClient
.
如果已找到相同的模式,则检索对它的引用。
如果没有,则注册架构,并提供新的版本号。
该消息将与contentType
标头,使用以下方案:application/[prefix].[subject].v[version]+avro
哪里prefix
是可配置的,并且subject
是从有效负载类型推断出来的。
例如,类型为User
可能作为内容类型为application/vnd.user.v2+avro
哪里user
是主语,并且2
是版本号。
接收消息时,转换器从传入消息的标头推断出架构引用并尝试检索它。架构在反序列化过程中用作编写器架构。
Avro Schema 注册表消息转换器属性
如果已通过将spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
,您可以通过设置以下属性来自定义注册的行为。
- spring.cloud.stream.schema.avro.dynamicSchemaGeneration启用
-
如果您希望转换器使用反射从 POJO 推断 Schema,请启用。
违约:
false
- spring.cloud.stream.schema.avro.readerSchema
-
Avro 通过查看写入器架构(源有效负载)和读取器架构(应用程序有效负载)来比较架构版本。有关详细信息,请参阅 Avro 文档。 如果设置,这将覆盖架构服务器上的任何查找,并使用本地架构作为读取器架构。 违约:
null
- spring.cloud.stream.schema.avro.schemaLocations
-
注册任何
.avsc
此属性中列出的文件与架构服务器。违约:
empty
- spring.cloud.stream.schema.avro.prefix
-
要在 Content-Type 标头上使用的前缀。
违约:
vnd
- spring.cloud.stream.schema.avro.subject命名策略
-
确定用于在架构注册表中注册 Avro 架构的使用者名称。有两种实现可用,
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
,其中主题是架构名称,以及org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
,它使用 Avro 架构命名空间和名称返回完全限定的主题。可以通过实现org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
.违约:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
- spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
-
忽略任何架构注册表通信。对于测试目的很有用,以便在运行单元测试时,它不会不必要地尝试连接到 Schema Registry 服务器。
违约:
false
Apache Avro 消息转换器
Spring Cloud Stream 通过其spring-cloud-stream-schema-registry-client
模块。
目前,基于架构的消息转换器唯一支持的开箱即用序列化格式是 Apache Avro,未来版本中将添加更多格式。
这spring-cloud-stream-schema-registry-client
模块包含两种类型的消息转换器,可用于 Apache Avro 序列化:
-
使用序列化或反序列化对象的类信息或具有启动时已知位置的架构的转换器。
-
使用架构注册表的转换器。它们在运行时定位架构,并随着域对象的发展动态注册新架构。
支持架构的转换器
这AvroSchemaMessageConverter
支持通过使用预定义的模式或使用类中可用的模式信息(反射式或包含在SpecificRecord
).
如果您提供定制转换器,那么不会创建缺省的 AvroSchemaMessageConverter bean。
以下示例显示了一个自定义转换器:
要使用自定义转换器,您只需将其添加到应用程序上下文中,并可选择指定一个或多个MimeTypes
与它相关联。
默认值MimeType
是application/avro
.
如果转化的目标类型是GenericRecord
,则必须设置模式。
以下示例演示如何通过注册 Apache Avro 在接收器应用程序中配置转换器MessageConverter
没有预定义的模式。
在此示例中,请注意 MIME 类型值为avro/bytes
,而不是默认值application/avro
.
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
相反,以下应用程序使用预定义的模式(在类路径上找到)注册转换器:
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
架构注册表服务器
Spring Cloud Stream 提供了模式注册表服务器实现。
要使用它,您可以下载最新的spring-cloud-stream-schema-registry-server
将其作为独立应用程序发布并运行:
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
您可以将模式注册表嵌入到现有的 Spring Boot Web 应用程序中。
为此,请将
|
这spring.cloud.stream.schema.server.path
属性可用于控制架构服务器的根路径(尤其是当它嵌入到其他应用程序中时)。
这spring.cloud.stream.schema.server.allowSchemaDeletion
boolean 属性允许删除模式。默认情况下,这是禁用的。
模式注册表服务器使用关系数据库来存储模式。 默认情况下,它使用嵌入式数据库。 您可以使用 Spring Boot SQL 数据库和 JDBC 配置选项自定义架构存储。
架构注册表服务器API
Schema Registry Server API由以下作组成:
-
POST /
— 见Registering a New Schema
-
GET /{subject}/{format}/{version}
— 见Retrieving an Existing Schema by Subject, Format, and Version
-
GET /{subject}/{format}
— 见Retrieving an Existing Schema by Subject and Format
-
GET /schemas/{id}
— 见Retrieving an Existing Schema by ID
-
DELETE /{subject}/{format}/{version}
— 见Deleting a Schema by Subject, Format, and Version
-
DELETE /schemas/{id}
— 见Deleting a Schema by ID
-
DELETE /{subject}
— 见Deleting a Schema by Subject
注册新架构
要注册新架构,请发送POST
请求到端点。/
接受包含以下字段的 JSON 有效负载:/
-
subject
:架构主题 -
format
:架构格式 -
definition
:架构定义
它的响应是 JSON 中的模式对象,具有以下字段:
-
id
:架构 ID -
subject
:架构主题 -
format
:架构格式 -
version
:架构版本 -
definition
:架构定义
按主题、格式和版本检索现有模式
要按主题、格式和版本检索现有架构,请发送GET
请求{subject}/{format}/{version}
端点。
它的响应是 JSON 中的模式对象,具有以下字段:
-
id
:架构 ID -
subject
:架构主题 -
format
:架构格式 -
version
:架构版本 -
definition
:架构定义
按主题和格式检索现有模式
要按主题和格式检索现有架构,请发送GET
请求/subject/format
端点。
其响应是 JSON 中每个架构对象的架构列表,其中包含以下字段:
-
id
:架构 ID -
subject
:架构主题 -
format
:架构格式 -
version
:架构版本 -
definition
:架构定义
按 ID 检索现有架构
要按架构的 ID 检索架构,请发送GET
请求/schemas/{id}
端点。
它的响应是 JSON 中的模式对象,具有以下字段:
-
id
:架构 ID -
subject
:架构主题 -
format
:架构格式 -
version
:架构版本 -
definition
:架构定义
按主题删除架构
DELETE /{subject}
按主题删除现有架构。
此说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。
Spring Cloud Stream 1.1.0.RELEASE 使用表名schema ,用于存储Schema 对象。Schema 是许多数据库实现中的关键字。
为了避免将来发生任何冲突,从 1.1.1.RELEASE 开始,我们选择了名称SCHEMA_REPOSITORY 用于存储表。
任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应在升级之前将其现有模式迁移到新表。 |
使用 Confluent 的架构注册表
默认配置会创建一个DefaultSchemaRegistryClient
豆。
如果要使用 Confluent 模式注册表,则需要创建一个类型为ConfluentSchemaRegistryClient
,它取代了框架默认配置的那个。以下示例显示了如何创建这样的 bean:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
ConfluentSchemaRegistryClient 针对 Confluent 平台版本 4.0.0 进行了测试。 |
架构注册过程(序列化)
注册过程的第一部分是从通过通道发送的有效负载中提取架构。
Avro 类型,例如SpecificRecord
或GenericRecord
已经包含一个模式,可以立即从实例中检索。
对于 POJO,如果spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
属性设置为true
(默认值)。
一旦获得模式,转换器就会从远程服务器加载其元数据(版本)。 首先,它查询本地缓存。如果未找到结果,它将数据提交到服务器,服务器会回复版本控制信息。 转换器始终缓存结果,以避免查询 Schema Server 以获取需要序列化的每条新消息的开销。
使用架构版本信息,转换器将contentType
报头来携带版本信息——例如:application/vnd.user.v1+avro
.
模式解析过程(反序列化)
读取包含版本信息的消息时(即contentType
标头,其方案如下所述Schema Registration Process (Serialization)
,转换器查询架构服务器以获取消息的编写器架构。
一旦它找到传入消息的正确架构,它就会检索读取器架构,并通过使用 Avro 的架构解析支持将其读取到读取器定义中(设置默认值和任何缺少的属性)。
应了解写入器架构(写入消息的应用程序)和读取器架构(接收应用程序)之间的区别。
我们建议花点时间阅读 Avro 术语并了解该过程。
Spring Cloud Stream 始终获取编写器模式以确定如何读取消息。
如果你想让 Avro 的模式演进支持正常工作,你需要确保readerSchema 已为您的应用程序正确设置。 |