对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

适用于 Apache Pulsar 的 Spring Cloud Stream Binder

Spring for Apache Pulsar 为 Spring Cloud Stream 提供了一个绑定器,我们可以使用它来使用发布-订阅范式构建事件驱动的微服务。在本节中,我们将介绍此绑定器的基本细节。spring-doc.cadn.net.cn

用法

我们需要在您的应用程序上包含以下依赖项,以便将 Apache Pulsar 绑定器用于 Spring Cloud Stream。spring-doc.cadn.net.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

概述

Apache Pulsar 的 Spring Cloud Stream 绑定器允许应用程序专注于业务逻辑,而不是处理管理和维护 Pulsar 的较低级别的细节。绑定器为应用程序开发人员处理所有这些细节。Spring Cloud Stream 带来了一个基于 Spring Cloud Function 的强大编程模型,允许应用程序开发人员使用函数式风格编写复杂的事件驱动应用程序。应用程序可以从中间件中立的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目标。Spring Cloud Stream 构建在 Spring Boot 之上,当使用 Spring Cloud Stream 编写事件驱动的微服务时,您实际上是在编写一个 Boot 应用程序。这是一个简单的 Spring Cloud Stream 应用程序。spring-doc.cadn.net.cn

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上面的示例应用程序是一个成熟的 Spring Boot 应用程序,值得一些解释。但是,在第一遍中,您可以看到这只是普通的 Java 和一些 Spring 和 Spring Boot 注释。我们有三个Bean这里的方法 - ajava.util.function.Supplier一个java.util.function.Function,最后是java.util.function.Consumer. 提供商以毫秒为单位生成当前时间,该函数占用这段时间,然后通过添加一些随机数据来增强它,然后消费者记录增强的时间。spring-doc.cadn.net.cn

为了简洁起见,我们省略了所有导入,但整个应用程序中没有特定于 Spring Cloud Stream 的内容。它如何成为与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序?您必须在应用程序中包含绑定器的上述依赖项。添加该依赖项后,您必须提供以下配置属性。spring-doc.cadn.net.cn

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

这样,上面的 Spring Boot 应用程序就变成了一个基于 Spring Cloud Stream 的端到端事件驱动应用程序。因为我们在类路径上有 Pulsar 绑定器,所以应用程序与 Apache Pulsar 交互。如果应用程序中只有一个函数,那么我们不需要告诉 Spring Cloud Stream 激活该函数进行执行,因为它默认这样做。如果应用程序中有多个这样的函数,如我们的示例所示,我们需要指示 Spring Cloud Stream 我们想要激活哪些函数。在我们的例子中,我们需要激活所有这些函数,我们通过spring.cloud.function.definition财产。 默认情况下,bean 名称成为 Spring Cloud Stream 绑定名称的一部分。绑定是 Spring Cloud Stream 中一个从根本上抽象的概念,框架使用它与中间件目标进行通信。Spring Cloud Stream 所做的几乎所有事情都发生在具体绑定上。提供商只有一个输出绑定;函数有输入和输出绑定,消费者只有输入绑定。让我们以我们的提供商 bean 为例 -timeSupplier.此提供商的默认绑定名称为timeSupplier-out-0. 同样,默认绑定名称timeProcessor函数将是timeProcessor-in-0在入站和timeProcessor-out-0在出站。有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。在大多数情况下,使用默认绑定名称就足够了。我们在绑定名称上设置了目标,如上所示。如果未提供目的地,则绑定名称将成为目标的值,如timeSupplier-out-0.spring-doc.cadn.net.cn

在运行上述应用程序时,您应该会看到提供商每秒执行一次,然后被函数消耗并增强记录器消费者消耗的时间。spring-doc.cadn.net.cn

基于活页夹的应用程序中的消息转换

在上面的示例应用程序中,我们没有提供用于消息转换的架构信息。 这是因为,默认情况下,Spring Cloud Stream 使用其消息转换机制,使用通过 Spring Messaging 项目在 Spring Framework 中建立的消息传递支持。 除非指定,否则 Spring Cloud Stream 使用application/json作为content-type用于入站和出站绑定上的消息转换。 在出站时,数据序列化为byte[],然后 Pulsar 活页夹使用Schema.BYTES通过线路将其发送到 Pulsar 主题。 同样,在入站时,数据将作为byte[]从 Pulsar 主题,然后使用适当的消息转换器转换为目标类型。spring-doc.cadn.net.cn

使用 Pulsar Schema 在 Pulsar 中使用本机转换

尽管默认是使用框架提供的消息转换,但 Spring Cloud Stream 允许每个 Binder 确定应如何转换消息。 假设应用程序选择走这条路。在这种情况下,Spring Cloud Stream 会避免使用任何 Spring 提供的消息转换工具,并传递它接收或生成的数据。 Spring Cloud Stream 中的此功能在生产者端称为原生编码,在消费者端称为原生解码。这意味着编码和解码本机发生在目标中间件上,在我们的例子中,在 Apache Pulsar 上。 对于上面的应用,我们可以使用以下配置来绕过框架转换,使用原生的编解码。spring-doc.cadn.net.cn

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生产者端启用本机编码的属性是来自核心 Spring Cloud Stream 的绑定级别属性。 你在生产者绑定上设置它 -spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding并将其设置为true.同样,使用 -spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding对于使用者绑定,并将其设置为true.如果我们决定使用原生编码和解码,在 Pulsar 的情况下,我们需要设置相应的 schema 和底层消息类型信息。 此信息作为扩展绑定属性提供。 正如您在上面的配置中看到的,属性是 -spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type用于架构信息和spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type对于实际目标类型。 如果消息上同时包含键和值,则可以使用message-key-typemessage-value-type以指定其目标类型。spring-doc.cadn.net.cn

schema-type属性被省略。

邮件头转换

每条消息通常都有标头信息,当消息通过 Spring Cloud Stream 输入和输出绑定在 Pulsar 和 Spring Messaging 之间遍历时,需要携带这些信息。 为了支持这种遍历,框架处理必要的消息头转换。spring-doc.cadn.net.cn

自定义标头映射器

Pulsar 绑定器配置了一个默认的标头映射器,可以通过提供你自己的PulsarHeaderMapper豆。spring-doc.cadn.net.cn

在以下示例中,JSON 标头映射器配置为:spring-doc.cadn.net.cn

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

在绑定器中使用 Pulsar 属性

该绑定器使用 Spring for Apache Pulsar 框架中的基本组件来构建其生产者和消费者绑定。 由于基于 binder 的应用程序是 Spring Boot 应用程序,因此默认情况下,binder 使用 Spring for Apache Pulsar 的 Spring Boot 自动配置。 因此,核心框架级别可用的所有 Pulsar Spring Boot 属性也可以通过 binder 获得。 例如,可以使用带有前缀spring.pulsar.producer…​,spring.pulsar.consumer…​等。 此外,您还可以在绑定器级别设置这些 Pulsar 属性。 例如,这也有效 -spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​.spring-doc.cadn.net.cn

上述任何一种方法都可以,但是在使用此类属性时,它将应用于整个应用程序。 如果应用程序中有多个函数,则它们都获得相同的属性。 您还可以在扩展绑定属性级别设置这些 Pulsar 属性来解决此问题。 扩展绑定属性应用于绑定本身。 例如,如果你有一个输入和输出绑定,并且两者都需要一组单独的 Pulsar 属性,则必须在扩展绑定上设置它们。 生产者绑定的模式是spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​. 同样,对于消费者绑定,模式为spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​. 这样,您可以为同一应用程序中的不同绑定应用一组单独的 Pulsar 属性。spring-doc.cadn.net.cn

最高优先级是扩展绑定属性。 在活页夹中应用属性的优先顺序是extended binding properties → binder properties → Spring Boot properties.(从最高到最低)。spring-doc.cadn.net.cn

以下是一些资源,可用于查找有关通过 Pulsar 活页夹提供的属性的更多信息。spring-doc.cadn.net.cn

Pulsar 生产者绑定配置。 这些属性需要spring.cloud.stream.bindings.<binding-name>.producer前缀。 所有 Spring Boot 提供的 Pulsar 生产者属性也可通过此配置类获得。spring-doc.cadn.net.cn

Pulsar 消费者绑定配置。 这些属性需要spring.cloud.stream.bindings.<binding-name>.consumer前缀。 所有 Spring Boot 提供的 Pulsar 消费者属性也可以通过此配置类获得。spring-doc.cadn.net.cn

有关常见的 Pulsar 绑定器特定配置属性,请参阅。这些属性需要前缀spring.cloud.stream.pulsar.binder. 上面指定的生产者和消费者属性(包括 Spring Boot 属性)可以在绑定器中使用spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer前缀。spring-doc.cadn.net.cn

Pulsar 主题配置器

适用于 Apache Pulsar 的 Spring Cloud Stream 绑定器附带了一个开箱即用的 Pulsar 主题配置器。 在运行应用程序时,如果缺少必要的主题,Pulsar 将为您创建主题。 但是,这是一个基本的非分区主题,如果您想要创建分区主题等高级功能,您可以依赖活页夹中的主题配置程序。 Pulsar 主题配置器使用PulsarAdministration来自框架,该框架使用PulsarAdminBuilder.因此,您需要将spring.pulsar.administration.service-url属性,除非您在默认服务器和端口上运行 Pulsar。spring-doc.cadn.net.cn

创建主题时指定分区计数

创建主题时,可以通过两种方式设置分区计数。 首先,您可以使用属性在活页夹级别设置它spring.cloud.stream.pulsar.binder.partition-count. 正如我们在上面看到的,这样做将使应用程序创建的所有主题都继承此属性。 假设你希望在绑定级别进行精细控制以设置分区。 在这种情况下,您可以将partition-count使用格式的每个绑定的属性spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count. 这样,同一应用程序中不同函数创建的各种主题将根据应用程序需求具有不同的分区。spring-doc.cadn.net.cn