<span>模式注册表</span>
介绍
当组织具有基于发布/订阅体系结构的消息传递架构,多个生产者和使用者微服务相互通信时,通常需要所有这些微服务都同意建立在模式为基础的合同上。当此类架构需要演进以适应新的业务需求时,仍然需要继续使用现有的组件。</p> Spring Cloud Stream 提供了支持,使用此独立模式,可以实现架构和应用程序的完全解耦。 </p>Spring Cloud Stream对avro-based模式注册表客户端提供了支持,它本质上提供了解析器,可与模式注册表进行通信,在消息转换期间协调模式。Spring Cloud Stream 提供的架构演变支持既可以与上述独立架构注册表一起使用,也可以与专门与 Apache Kafka 一起使用的 Confluent 提供的架构注册表一起使用。
Spring Cloud Stream架构注册表概述
Spring Cloud Stream Schema Registry 提供了架构演进支持,使数据可以在时间上进行演变,并且仍然可以与较旧或较新 的生产者和使用者一起使用,反之亦然。大多数序列化模型,尤其是那些旨在跨不同平台和语言实现可移植性的模型,都依赖于描述二进制有效负载中数据如何序列化的架构。为了序列化数据,然后进行解释,发送方和接收方都必须能够访问描述二进制格式的架构。在某些情况下,可以从序列化时的payload类型推断出模式,也可以从反序列化时的目标类型推断出模式。但是,许多应用程序从能够访问显式模式中受益,该模式描述二进制数据格式。模式注册表使您能够以文本格式(通常是JSON)存储模式信息,并使这些信息可供需要以二进制格式接收和发送数据的各种应用程序使用。***
-
一个主体,它是模式的逻辑名称
-
Schema 版本
-
架构格式,描述数据的二进制格式
Spring Cloud Stream架构注册表提供以下组件
-
独立模式注册表服务器
By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
-
具有通过与模式注册表通信来执行消息封送处理的模式注册表客户端。
Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.
Schema Registry 客户端
客户端侧抽象类用于与模式注册表服务器进行交互,即SchemaRegistryClient接口,其结构如下所示:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 提供了与自己的模式服务器以及与 Confluent 模式注册表交互的现成实现。
可以使用@EnableSchemaRegistryClient配置Spring Cloud Stream模式注册表的客户端,如下所示:
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
默认的转换器被优化为不仅缓存远程服务器的架构,而且还缓存parse()和toString()方法,这两个方法相当昂贵。由于这个原因,它使用了一个不缓存响应的2>。如果你打算改变这种行为,你可以在你的代码中直接使用客户端,并将其覆盖到所需的输出。为此,你必须在你的application.properties文件中添加属性 |
<h2>模式注册表客户端属性</h2>
Spring 框架开发人员指南支持以下属性:
spring.cloud.stream.schemaRegistryClient.endpoint-
schema-server 的位置。 当设置此属性时,请使用包含协议(0 或 1)、端口和上下文路径的完整 URL。
- 默认
spring.cloud.stream.schemaRegistryClient.cached-
客户端是否应缓存schema服务器的响应。 通常设置为
false,因为缓存在消息转换器中处理。 使用schema注册表客户端的客户端应将此设置为true。 - 默认
-
false
Avro模式注册表客户端消息转换器
对于在应用程序上下文中注册了<code>SchemaRegistryClient</code> bean的应用程序,Spring Cloud Stream会自动为模式管理配置一个Apache Avro消息转换器。这可以简化模式演变,因为接收消息的应用程序可以轻松访问写入模式,该模式可以与自己的读取模式进行协调。
对于出站消息,如果绑定内容类型的设置为application/*+avro,则会激活MessageConverter,如以下示例所示:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
在出站转换期间,消息转换器尝试推断每个出站消息的模式(基于其类型),并根据有效负载类型通过使用SchemaRegistryClient将其注册到主题。如果找到相同的模式,则检索对该模式的引用。如果没有找到,则注册模式,并提供新版本号。消息使用以下方案contentType标头发送:application/[prefix].[subject].v[version]+avro,其中prefix是可配置的,subject是从有效负载类型派生的。
例如,类型为 User 的消息可能作为内容类型为 application/vnd.user.v2+avro 的二进制有效载荷发送,其中 user 是主题,2 是版本号。
接收消息时,转换器会从 incoming 消息的头部推断 schema 引用,并尝试检索它。该 schema 会在反序列化过程中作为 writer schema 使用。
Avro 架构注册表消息转换器属性
如果您通过设置spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro已启用Avro基于模式注册表客户端,则可以设置以下属性来自定义注册行为。
- spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
-
启用,以便转换器使用反射推断POJO的模式。
默认值:
false - spring.cloud.stream.schema.avro.readerSchema
-
Avro 通过查看写入程序模式(原始有效负载)和读取器模式(你的应用程序有效负载)来比较模式版本。有关更多信息,请参阅Avro 文档。如果设置,此选项会覆盖对架构服务器的所有查询,并使用本地架构作为读取器架构。默认值:1
- spring.cloud.stream.schema.avro.schemaLocations
-
使用此属性向模式服务器注册任何列于此属性中的
.avsc文件。默认值:
empty - spring.cloud.stream.schema.avro.prefix
-
内容类型头的前缀。
默认值:
vnd - spring.cloud.stream.schema.avro.subjectNamingStrategy
-
确定在模式注册表中注册 Avro 架构的主体名称。提供了两个实现,
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy,其中主题是架构名称;以及org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy,它使用 Avro 架构命名空间和名称返回完全限定的主题。可以通过实现org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy来创建自定义策略。默认值:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy - spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
-
忽略任何架构注册表通信。在单元测试中,这很有用,这样当运行单元测试时,它不会不必要地尝试连接到架构注册表服务器。
默认值:
false
Apache Avro消息转换器
Spring Cloud Stream通过其spring-cloud-stream-schema-registry-client模块为基于模式的消息转换器提供了支持。 目前,对于模式驱动的消息传递,开箱即用的序列化格式只有Apache Avro,未来版本中会添加更多格式。
spring-cloud-stream-schema-registry-client模块包含可用于Apache Avro序列化的两种类型的消息转换器:
-
使用序列化或反序列化的对象的类信息,或者在启动时位置已知的架构进行转换。
-
使用架构注册表的转换器。它们在运行时定位架构,并动态注册新架构,以便随着域对象的发展而发展。
转换器与模式支持
AvroSchemaMessageConverter支持通过使用预定义的模式或使用类中的模式信息(通过反射或包含在SpecificRecord中)来序列化和反序列化消息。如果提供自定义转换器,那么不会创建默认的AvroSchemaMessageConverter bean。下面的示例显示了一个自定义转换器:
要使用自定义转换器,可以简单地将其添加到应用程序上下文中,可选地指定一个或多个MimeTypes与之关联。默认的MimeType是application/avro。
如果目标类型的转换是 GenericRecord,必须设置 schema。
本示例演示如何通过注册Apache Avro MessageConverter 转换器来配置接收器应用的转换功能。请注意,此示例中的 MIME 类型值为 avro/bytes,而不是默认值 application/avro。
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
相反地,下面的应用程序会用一个预先定义好的模式(在类路径中找到)去注册一个转换器:
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
<span>模式注册表服务器</span>
Spring Cloud Stream 提供了架构注册表服务器的实现。 要使用它,您可以下载最新的spring-cloud-stream-schema-registry-server版本并将其作为独立应用程序运行:
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
|
您可以将模式注册表嵌入到您现有的Spring Boot Web应用程序中。 为此,向项目添加 以下示例显示了一个启用模式注册表的Spring Boot应用程序:
|
The spring.cloud.stream.schema.server.path property 可以用于控制模式服务器的根路径(尤其是在嵌入到其他应用程序中时)。
The spring.cloud.stream.schema.server.allowSchemaDeletion 布尔属性启用模式的删除。默认情况下,此功能是禁用的。
模式注册表服务器使用关系数据库来存储模式。默认情况下,它使用嵌入式数据库。您可以通过使用
Schema Registry Server API
该模式注册表服务器API由以下操作组成:
-
POST /— 见Registering a New Schema -
GET /{subject}/{format}/{version}— 见Retrieving an Existing Schema by Subject, Format, and Version -
GET /{subject}/{format}— 见Retrieving an Existing Schema by Subject and Format -
GET /schemas/{id}— 见Retrieving an Existing Schema by ID -
DELETE /{subject}/{format}/{version}— 见Deleting a Schema by Subject, Format, and Version -
DELETE /schemas/{id}— 见Deleting a Schema by ID -
DELETE /{subject}— 见Deleting a Schema by Subject
注册新模式
要注册新架构,请向 / 终端发送 POST 请求。
The / 接受一个包含以下字段的 JSON 负载:
-
subject: The schema subject -
format: 该模式格式 -
definition: The schema definition
它的响应是一个JSON中的模式对象,具有以下字段:
-
id: 该模式的ID -
subject: The schema subject -
format: 该模式格式 -
version: 架构版本号 -
definition: The schema definition
通过主题、格式和版本检索现有模式
要检索按主题、格式和版本存在的架构,请向{subject}/{format}/{version}端点发送GET请求。
它的响应是一个JSON中的模式对象,具有以下字段:
-
id: 该模式的ID -
subject: The schema subject -
format: 该模式格式 -
version: 架构版本号 -
definition: The schema definition
通过主题和格式检索现有模式
按主题和格式检索现有模式,向 /subject/format 端点发送 GET 请求。
其响应是一个模式列表,每个模式对象都以JSON格式表示,具有以下字段:
-
id: 该模式的ID -
subject: The schema subject -
format: 该模式格式 -
version: 架构版本号 -
definition: The schema definition
检索现有模式ID
To retrieve a schema by its ID, send a GET request to the /schemas/{id} endpoint.
它的响应是一个JSON中的模式对象,具有以下字段:
-
id: 该模式的ID -
subject: The schema subject -
format: 该模式格式 -
version: 架构版本号 -
definition: The schema definition
删除模式根据主题
DELETE /{subject}
删除其主题的现有模式。
This note applies to users of Spring Cloud Stream 1.1.0.RELEASE only.
Spring Cloud Stream 1.1.0.RELEASE used the table name, schema, for storing Schema objects. Schema is a keyword in a number of database implementations.
To avoid any conflicts in the future, starting with 1.1.1.RELEASE, we have opted for the name SCHEMA_REPOSITORY for the storage table.
Any Spring Cloud Stream 1.1.0.RELEASE users who upgrade should migrate their existing schemas to the new table before upgrading. |
使用Confluent的模式注册表
默认配置会创建一个DefaultSchemaRegistryClient bean。 如果您想使用 Confluent 架构注册表,您需要创建一个类型为ConfluentSchemaRegistryClient的 bean,它会替代框架默认配置的 bean。下面的示例展示了如何创建这样的 bean:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
| ConfluentSchemaRegistryClient在版本4.0.0的Confluent平台中进行了测试。 |
模式注册过程(序列化)
注册流程的第一部分是从通过通道发送的有效负载中提取模式。
Avro 类型,例如SpecificRecord或GenericRecord已经包含一个模式,可以从实例立即检索到。
在 POJO 的情况下,如果将spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled属性设置为true(默认值),则可以推断出模式。
获得架构后,转换器会从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果没有找到结果,则将数据提交到服务器,服务器会回复版本信息。转换器始终将结果缓存在本地,以避免为每个需要序列化的消息重复查询架构服务器所带来的开销。
有了模式版本信息,转换器会将消息的contentType标题设置为携带版本信息——例如:application/vnd.user.v1+avro。
模式解析过程(反序列化)
当读取包含版本信息的消息(即带有类似于Schema Registration Process (Serialization)下描述方案的contentType标题)时,转换器会查询模式服务器以获取消息的写入者模式。
一旦找到传入消息的正确模式,它就会检索阅读者模式,并通过使用Avro的模式解析支持将其读入阅读者定义(设置默认值和任何缺失的属性)。
| 您应该理解编写者模式(写入消息的应用程序)和读者模式(接收应用程序)之间的区别。 我们建议花一点时间阅读Avro术语并了解该过程。 Spring Cloud Stream始终获取编写者模式以确定如何读取消息。 如果要使Avro的模式演变支持正常工作,需要确保为您的应用程序正确设置了 |