Apache Pulsar 的 Spring Cloud Stream 绑定器

Spring for Apache Pulsar 提供了用于 Spring Cloud Stream 的绑定器,我们可以使用它来构建使用发布/订阅范式的事件驱动微服务。在本节中,我们将了解此绑定器的基本信息。spring-doc.cadn.net.cn

用法

要使用 Spring Cloud Stream 的 Apache Pulsar Binder,需要在应用程序中包含以下依赖项。spring-doc.cadn.net.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

概述

Apache Pulsar 的 Spring Cloud Stream 绑定器使应用程序能够专注于业务逻辑,而不是处理管理维护 Pulsar 的底层细节。 绑定器为应用程序开发人员处理所有这些细节。 Spring Cloud Stream 基于 Spring Cloud Function 提供了强大的编程模型,该模型允许应用开发人员使用函数式风格编写复杂的事件驱动应用程序。 应用程序可以从中间件无关的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目的地。 Spring Cloud Stream 是基于 Spring Boot 构建的,当使用 Spring Cloud Stream 编写事件驱动微服务时,您实际上是在编写一个 Boot 应用程序。 这是一个简单的 Spring Cloud Stream 应用程序。spring-doc.cadn.net.cn

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上述示例应用程序是一个完整的Spring Boot应用程序,值得进行一些解释。然而,在第一次查看时,你可以看到这仅仅是一些普通的Java代码以及几个Spring和Spring Boot注解。
我们有三个Bean方法在这里——一个java.util.function.Supplier、一个java.util.function.Function,最后还有一个java.util.function.Consumer
提供商生成以毫秒为单位的当前时间,函数采用此时间然后通过添加一些随机数据对其进行增强,接着消费者记录增强后的时间。spring-doc.cadn.net.cn

为简洁起见,我们省略了所有的导入语句,但整个应用程序中没有任何与 Spring Cloud Stream 相关的内容。 如何使其成为一个与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序? 您必须在应用程序中包含上述绑定器依赖项。 添加该依赖项后,您必须提供以下配置属性。spring-doc.cadn.net.cn

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

至此,上述基于Spring Cloud Stream的Spring Boot应用程序已成为一个端到端事件驱动的应用程序。由于我们已经在类路径中包含了 Pulsar 绑定器,应用程序将与 Apache Pulsar 进行交互。如果应用程序中只有一个函数,则我们不需要告诉 Spring Cloud Stream 启用该函数执行,因为它默认会这样做。如果应用程序中有多个这样的函数,如我们的示例所示,我们需要指示 Spring Cloud Stream 我们希望激活哪些函数。在我们的例子中,我们需要全部激活它们,并通过spring.cloud.function.definition属性来实现。默认情况下,bean 名称会成为 Spring Cloud Stream 绑定名称的一部分。绑定是Spring Cloud Stream中的一个基本抽象概念,框架使用该概念与中间件目标进行通信。Spring Cloud Stream 所做的几乎所有事情都是通过具体的绑定来实现的。提供者只有输出绑定;函数有输入和输出绑定,而使用者只有输入绑定。让我们以我们的提供商 bean 为例 — timeSupplier.spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

此提供商的默认绑定名称为timeSupplier-out-0。同样,timeProcessor函数的默认绑定名称在传入时为timeProcessor-in-0,传出时为timeProcessor-out-0。有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。在大多数情况下,使用默认绑定名称就足够了。如上所示,我们将目标设置在绑定名称上。如果未提供目标,则绑定名称将成为目的地的值,如timeSupplier-out-0的情况。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

运行上述应用程序时,您会看到提供商每秒执行一次,然后由函数消耗并增强记录器消费者所花费的时间。spring-doc.cadn.net.cn

绑定应用程序中的消息转换

在上述示例应用程序中,我们未提供消息转换的模式信息。这是因为默认情况下,Spring Cloud Stream 使用其通过 Spring Messaging 项目建立的消息传递支持来实现消息转换机制。除非特别指定,否则 Spring Cloud Stream 在入站和出站绑定上均使用 application/json 作为 content-type 进行消息转换。在出站时,数据被序列化为 byte[],,然后 Pulsar 绑定使用 Schema.BYTES 将其通过网络发送到 Pulsar 主题。同样,在入站时,数据从 Pulsar 主题以 byte[] 的形式接收,然后使用适当的消息转换器转换为目标类型。spring-doc.cadn.net.cn

在 Pulsar 中使用 Pulsar Schema 进行原生转换

尽管默认使用框架提供的消息转换,Spring Cloud Stream 允许每个绑定器确定如何转换消息。</p><p>假设应用程序选择采用这种方式。在这种情况下,Spring Cloud Stream 将避免使用任何由 Spring 提供的消息转换设施,并传递接收到或产生的数据。</p><p>这个功能在 Spring Cloud Stream 中被称为生产者的原生编码和消费者的原生解码。这意味着编码和解码将在目标中间件(在我们的案例中是 Apache Pulsar)上本地发生。</p><p>对于上述应用程序,我们可以使用以下配置来绕过框架转换并使用原生编码和解码。spring-doc.cadn.net.cn

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生产者端启用原生编码的属性是核心Spring Cloud Stream中的绑定级别属性。您可以在生产者绑定上设置它- spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding 并将其设置为 true. 。同样,对于消费者绑定使用 - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding 并将其设置为 true. 。如果我们决定使用原生编码和解码,在Pulsar的情况下,我们需要设置相应的模式和底层消息类型信息。此信息作为扩展绑定属性提供。如上配置所示,这些属性分别是用于模式信息的spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type 和实际目标类型的spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type。如果您需要同时指定键和值的目标类型,则可以使用message-key-typemessage-value-type来指定它们。spring-doc.cadn.net.cn

当省略schema-type属性时,将查阅任何已配置的自定义架构映射。

消息头转换

每个消息通常都有需要在 Pulsar 和 Spring 消息传递通过 Spring Cloud Stream 输入和输出绑定之间传输的头信息。 为了支持这种传输,框架处理必要的消息头转换。spring-doc.cadn.net.cn

自定义标题映射

流式绑定程序默认使用一个头映射器,可通过提供您自己的PulsarHeaderMapper bean来覆盖它。spring-doc.cadn.net.cn

在下面的示例中,配置了一个JSON头映射器,该映射器的功能包括:spring-doc.cadn.net.cn

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

在绑定器中使用Pulsar属性

绑定器使用来自 Spring for Apache Pulsar 框架的基本组件来构建其生产者和消费者绑定。
由于基于绑定器的应用程序是 Spring Boot 应用程序,因此默认情况下,绑定器会使用 Spring Boot 自动配置功能进行 Spring for Apache Pulsar。
因此,在核心框架级别可用的所有 Pulsar Spring Boot 属性也可通过绑定器获得。
例如,您可以使用带有前缀 spring.pulsar.producer…​spring.pulsar.consumer…​ 等的属性。
此外,您还可以在绑定器级别设置这些 Pulsar 属性。
例如,这也将适用 - spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​spring-doc.cadn.net.cn

以上任一方法均可,但使用这些属性时,它们会应用于整个应用程序。
如果您的应用程序中有多个函数,则所有函数都会获得相同的属性。
您也可以在扩展绑定属性级别设置这些Pulsar属性以解决此问题。
扩展绑定属性是在绑定本身上应用的。
例如,如果您有输入和输出绑定,并且两者都需要一组单独的Pulsar属性,则必须在扩展绑定上进行设置。
生产者绑定的模式为spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​
同样,消费者绑定的模式为spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​
这样,您可以在同一应用程序中的不同绑定上应用一组单独的Pulsar属性。spring-doc.cadn.net.cn

最高优先级的是扩展绑定属性。
应用绑定器中属性的优先级顺序为 extended binding properties → binder properties → Spring Boot properties.(从高到低)。spring-doc.cadn.net.cn

以下是有关Pulsar绑定提供的属性的更多资源,可以参考这些资源来查找更多信息。spring-doc.cadn.net.cn

Pulsar 生产者绑定配置。 这些属性需要 spring.cloud.stream.bindings.<binding-name>.producer 前缀。 所有 Spring Boot 提供的 Pulsar 生产者属性 也通过此配置类可用。spring-doc.cadn.net.cn

Pulsar消费者绑定配置。这些属性需要spring.cloud.stream.bindings.<binding-name>.consumer前缀。
所有Spring Boot提供的Pulsar消费者属性也通过此配置类可用。spring-doc.cadn.net.cn

有关常见Pulsar绑定器特定配置属性,请参阅。这些属性需要spring.cloud.stream.pulsar.binder前缀。
上述指定的生产者和消费者属性(包括Spring Boot属性)可以使用spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer前缀在绑定器中使用。spring-doc.cadn.net.cn

Pulsar 主题配置器

Spring Cloud Stream 的 Apache Pulsar 绑定器自带了用于 Pulsar 主题的开箱即用预置程序。
运行应用程序时,如果缺少必要的主题,Pulsar 将为您创建这些主题。
但是,这是一个基本的非分区主题,如果您想要高级功能(如创建分区主题),可以依赖绑定器中的主题预置程序。
Pulsar 主题预置程序使用 PulsarAdministration 来自框架,该框架使用 PulsarAdminBuilder.
因此,除非您在默认服务器和端口上运行 Pulsar,否则需要设置 spring.pulsar.administration.service-url 属性。spring-doc.cadn.net.cn

创建主题时指定分区数量

创建主题时,可以通过两种方式设置分区数量。 首先,可以使用属性 spring.cloud.stream.pulsar.binder.partition-count 在绑定器级别进行设置。 如上所述,采用这种方式将使应用程序创建的所有主题继承此属性。 假设您希望在绑定级别对分区设置进行细粒度控制。 在这种情况下,您可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 对每个绑定设置 partition-count 属性。 这样,同一应用程序中不同函数创建的各种主题将根据应用程序需求具有不同的分区数。spring-doc.cadn.net.cn