概述
前言
Spring 数据集成之旅简史
Spring 的数据集成之旅始于 Spring Integration。凭借其编程模型,它提供了一致的开发人员体验来构建可以采用企业集成模式以与外部系统(例如数据库、消息代理等)连接的应用程序。
快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,开发基于 Spring 的独立生产级微服务变得无缝。
为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被放到一个新项目中。Spring Cloud Stream 诞生了。
借助 Spring Cloud Stream,开发人员可以:
-
单独构建、测试和部署以数据为中心的应用程序。
-
应用现代微服务体系结构模式,包括通过消息传递进行组合。
-
将应用程序职责与以事件为中心的思维解耦。事件可以表示及时发生的事情,下游消费者应用程序可以在不知道其起源或生产者身份的情况下对此做出反应。
-
将业务逻辑移植到消息代理(例如 RabbitMQ、Apache Kafka、Amazon Kinesis)上。
-
依靠框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。
-
还有很多......
快速入门
按照此三步指南,您可以在不到 5 分钟的时间内试用 Spring Cloud Stream,甚至在进入任何细节之前。
我们将向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的消息传递中间件的消息(稍后会详细介绍),并将收到的消息记录到控制台。
我们称之为LoggingConsumer
.
虽然不是很实用,但它很好地介绍了一些主要概念
和抽象,使消化本用户指南的其余部分变得更加容易。
这三个步骤如下:
使用 Spring Initializr 创建示例应用程序
要开始使用,请访问 Spring Initializr。从那里,您可以生成我们的LoggingConsumer
应用。为此,请执行以下作:
-
在“依赖项”部分中,开始键入
stream
. 当出现“云流”选项时,选择它。 -
开始输入“kafka”或“rabbit”。
-
选择“Kafka”或“RabbitMQ”。
基本上,您可以选择应用程序绑定到的消息传递中间件。 我们建议使用您已经安装的或对安装和运行感觉更舒服的那个。 此外,正如您从初始化器屏幕中看到的那样,您还可以选择其他一些选项。 例如,您可以选择 Gradle 作为构建工具,而不是 Maven(默认)。
-
在“项目”字段中,键入“logging-consumer”。
“项目”字段的值将成为应用程序名称。 如果您选择了 RabbitMQ 作为中间件,则您的 Spring Initializr 现在应该如下所示:

-
单击“生成项目”按钮。
这样做会将生成项目的压缩版本下载到您的硬盘驱动器。
-
将文件解压缩到要用作项目目录的文件夹中。
我们鼓励您探索 Spring Initializr 中可用的多种可能性。它允许您创建许多不同类型的 Spring 应用程序。 |
将项目导入 IDE
现在,您可以将项目导入 IDE。请记住,根据 IDE,您可能需要遵循特定的导入过程。例如,根据项目的生成方式(Maven 或 Gradle),您可能需要遵循特定的导入过程(例如,在 Eclipse 或 STS 中,您需要使用文件→导入→ Maven →现有 Maven 项目)。
导入后,项目不得有任何类型的错误。 也src/main/java
应包含com.example.loggingconsumer.LoggingConsumerApplication
.
从技术上讲,此时您可以运行应用程序的主类。它已经是一个有效的 Spring Boot 应用程序。但是,它什么都不做,所以我们想添加一些代码。
添加消息处理程序、构建和运行
修改com.example.loggingconsumer.LoggingConsumerApplication
类如下所示:
@SpringBootApplication
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
从前面的列表中可以看出:
-
我们正在使用函数式编程模型(参见 Spring Cloud 函数支持)将单个消息处理程序定义为
Consumer
. -
我们依靠框架约定将此类处理程序绑定到绑定器公开的输入目标绑定。
这样做还可以让您看到该框架的核心功能之一:它尝试自动将传入消息有效负载转换为类型Person
.
您现在拥有一个功能齐全的 Spring Cloud Stream 应用程序,它确实监听消息。从这里开始,为简单起见,我们假设您在第一步中选择了 RabbitMQ。假设您已安装并运行 RabbitMQ,您可以通过运行其main
方法。
您应该会看到以下输出:
--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
--- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
--- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
. . .
--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
. . .
--- [ main] c.e.l.LoggingConsumerApplication : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)
进入 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,向input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
.
这anonymous.CbMIwdkJSBO1ZoPDOtHtCg
part 表示组名称并生成,因此它在您的环境中必然会有所不同。
对于更可预测的内容,您可以通过将spring.cloud.stream.bindings.input.group=hello
(或任何你喜欢的名字)。
消息的内容应该是Person
类,如下所示:
{"name":"Sam Spade"}
然后,在控制台中,您应该会看到:
Received: Sam Spade
您还可以将应用程序构建并打包到引导 jar 中(通过使用./mvnw clean install
),然后使用java -jar
命令。
现在您有一个工作(尽管非常基本)的 Spring Cloud Stream 应用程序。
流数据上下文中的 Spring 表达式语言 (SpEL)
在本参考手册中,您将遇到许多可以使用 Spring 表达式语言 (SpEL) 的功能和示例。在使用它时了解某些限制非常重要。
SpEL 允许您访问当前消息以及您正在运行的应用程序上下文。
但是,了解 SpEL 可以看到什么类型的数据非常重要,尤其是在传入消息的上下文中。
消息以 byte[] 的形式从代理到达。然后将其转换为Message<byte[]>
通过活页夹,如您所见,消息的有效负载保持其原始形式。消息的标头是<String, Object>
,其中值通常是另一个基元或基元的集合/数组,因此是 Object。
这是因为 binder 不知道所需的输入类型,因为它无法访问用户代码(函数)。因此,活页夹有效地以邮件标题的形式递送了一个包含有效负载和一些可读元数据的信封,就像通过邮件递送的信件一样。
这意味着,虽然可以访问消息的有效负载,但您只能将其作为原始数据(即 byte[])访问。虽然开发人员要求能够以具体类型(例如 Foo、Bar 等)对有效负载对象的字段进行 SpEL 访问可能是很常见的,但您可以看到实现这是多么困难甚至不可能。
这是一个例子来演示这个问题;假设您有一个路由表达式,用于根据有效负载类型路由到不同的函数。此要求意味着有效负载从 byte[] 转换为特定类型,然后应用 SpEL。但是,为了执行这样的转换,我们需要知道要传递给转换器的实际类型,这来自函数的签名,我们不知道是哪个。解决此要求的更好方法是将类型信息作为消息头传递(例如,application/json;type=foo.bar.Baz
).您将获得一个清晰可读的字符串值,可以在一年内访问和评估该值,并且易于阅读的 SpEL 表达式。
此外,使用有效负载进行路由决策被认为是非常糟糕的做法,因为有效负载被认为是特权数据 - 数据只能由其最终接收者读取。同样,使用邮件递送的类比,您不希望邮递员打开您的信封并阅读信件的内容来做出一些递送决定。同样的概念也适用于此,尤其是在生成消息时相对容易包含此类信息时。它强制执行与要通过网络传输的数据的设计以及此类数据中的哪些部分可以被视为公共数据以及哪些数据具有特权相关的一定级别的纪律。
Spring Cloud Stream 简介
Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。 Spring Cloud Stream 基于 Spring Boot 构建,可创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。 它提供了来自多个提供商的中间件的约定优于配置的配置,引入了持久发布-订阅语义、消费者组和分区的概念。
加入spring-cloud-stream
依赖于应用程序的类路径,您可以立即获得连接
到由提供的spring-cloud-stream
binder(稍后会详细介绍),您可以实现您的函数
requirement,该要求由java.util.function.Function
.
以下列表显示了一个快速示例:
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
以下列表显示了相应的测试:
@SpringBootTest(classes = SampleApplication.class)
@Import({TestChannelBinderConfiguration.class})
class BootTestStreamApplicationTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
void contextLoads() {
input.send(new GenericMessage<byte[]>("hello".getBytes()));
assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
}
}
主要概念
Spring Cloud Stream 提供了许多抽象和原语,可以简化消息驱动的微服务应用程序的编写。本节概述了以下内容:
应用模式
Spring Cloud Stream 应用程序由中间件中立的核心组成。 应用程序通过在目的地之间建立绑定来与外界通信 由代码中的外部代理和输入/输出参数公开。经纪人特定详细信息 建立绑定所必需的由特定于中间件的 Binder 实现处理。

胖罐
Spring Cloud Stream 应用程序可以从 IDE 以独立模式运行以进行测试。 要在生产环境中运行 Spring Cloud Stream 应用程序,您可以使用为 Maven 或 Gradle 提供的标准 Spring Boot 工具创建可执行(或“胖”)JAR。有关更多详细信息,请参阅 Spring Boot 参考指南。
Binder 抽象
Spring Cloud Stream 为 Kafka 和 Rabbit MQ 提供了 Binder 实现。 该框架还包括一个测试绑定器,用于将应用程序集成测试为 spring-cloud-stream 应用程序。有关更多详细信息,请参阅测试部分。
Binder 抽象也是框架的扩展点之一,这意味着您可以在 Spring Cloud Stream 之上实现自己的 binder。
在如何从头开始创建 Spring Cloud Stream Binder 发布社区成员文档中
详细介绍,通过示例,实现自定义活页夹所需的一组步骤。
这些步骤也在Implementing Custom Binders
部分。
Spring Cloud Stream 使用 Spring Boot 进行配置,而 Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接到中间件。
例如,部署者可以在运行时动态选择外部目标(例如 Kafka 主题或 RabbitMQ 交换)和输入之间的映射
以及消息处理程序的输出(例如函数的输入参数及其返回参数)。
此类配置可以通过外部配置属性提供,并以 Spring Boot 支持的任何形式(包括应用程序参数、环境变量和application.yml
或application.properties
文件)。
在 Spring Cloud Stream 简介部分的 sink 示例中,将spring.cloud.stream.bindings.input.destination
application 属性设置为raw-sensor-data
导致它从raw-sensor-data
Kafka 主题或从绑定到raw-sensor-data
RabbitMQ 交换。
Spring Cloud Stream 会自动检测并使用在类路径上找到的绑定器。 您可以使用相同的代码使用不同类型的中间件。 为此,请在生成时包含不同的活页夹。 对于更复杂的用例,您还可以将多个绑定器与应用程序打包在一起,并让它在运行时选择绑定器(甚至是否为不同的绑定使用不同的绑定器)。
持久发布-订阅支持
应用程序之间的通信遵循发布-订阅模型,其中数据通过共享主题广播。 这可以在下图中看到,它显示了一组交互的 Spring Cloud Stream 应用程序的典型部署。

传感器向 HTTP 端点报告的数据将发送到名为raw-sensor-data
.
从目标开始,它由计算时间窗口平均值的微服务应用程序和将原始数据摄取到 HDFS(Hadoop 分布式文件系统)的另一个微服务应用程序独立处理。
为了处理数据,两个应用程序在运行时都将主题声明为其输入。
发布-订阅通信模型降低了生产者和消费者的复杂性,并允许将新应用程序添加到拓扑中,而不会中断现有流。 例如,在平均值计算应用程序的下游,您可以添加一个应用程序来计算最高温度值以进行显示和监控。 然后,您可以添加另一个应用程序来解释相同的平均值流以进行故障检测。 通过共享主题而不是点对点队列进行所有通信可以减少微服务之间的耦合。
虽然发布-订阅消息传递的概念并不新鲜,但 Spring Cloud Stream 采取了额外的步骤,使其成为其应用程序模型的约定优于配置的选择。 通过使用本机中间件支持,Spring Cloud Stream 还简化了跨不同平台的发布-订阅模型的使用。
消费者群体
虽然发布-订阅模型使通过共享主题连接应用程序变得容易,但通过创建给定应用程序的多个实例来扩展的能力同样重要。 这样做时,应用程序的不同实例被放置在竞争的使用者关系中,其中只有一个实例需要处理给定的消息。
Spring Cloud Stream 通过消费者组的概念对这种行为进行建模。
(Spring Cloud Stream 消费者组与 Kafka 消费者组类似并受到其启发。
每个使用者绑定都可以使用spring.cloud.stream.bindings.<bindingName>.group
属性来指定组名称。
对于下图所示的使用者,此属性将设置为spring.cloud.stream.bindings.<bindingName>.group=hdfsWrite
或spring.cloud.stream.bindings.<bindingName>.group=average
.

订阅给定目标的所有组都会收到已发布数据的副本,但每个组中只有一个成员从该目标接收给定消息。 默认情况下,当未指定组时,Spring Cloud Stream 会将应用程序分配给一个匿名且独立的单成员消费者组,该组与所有其他消费者组处于发布-订阅关系。
消费者类型
支持两种类型的使用者:
-
消息驱动(有时称为异步)
-
轮询(有时称为同步)
在 2.0 版之前,仅支持异步使用者。消息一可用,就会立即传递,并且有线程可用于处理它。
如果要控制消息的处理速率,则可能需要使用同步使用者。
耐久性
与 Spring Cloud Stream 约定优于配置的应用模式一致,消费者组订阅是持久的。 也就是说,绑定程序实现可确保组预订是持久的,并且一旦为组创建了至少一个预订,该组就会接收消息,即使这些消息是在组中的所有应用程序停止时发送的。
匿名订阅本质上是非持久的。 对于某些 Binder 实现(例如 RabbitMQ),可以具有非持久组订阅。 |
通常,在将应用程序绑定到给定目标时,最好始终指定使用者组。 纵向扩展 Spring Cloud Stream 应用程序时,必须为其每个输入绑定指定一个使用者组。 这样做可以防止应用程序的实例接收重复消息(除非需要该行为,这是不寻常的)。
分区支持
Spring Cloud Stream 支持在给定应用程序的多个实例之间对数据进行分区。 在分区场景中,物理通信介质(例如代理主题)被视为结构化为多个分区。 一个或多个生产者应用实例将数据发送到多个消费者应用实例,并确保由共同特征标识的数据由同一消费者实例处理。
Spring Cloud Stream 提供了一个通用的抽象,用于以统一的方式实现分区处理用例。 因此,无论代理本身是自然分区的(例如,Kafka)还是不分区(例如,RabbitMQ),都可以使用分区。

分区是有状态处理中的一个关键概念,确保所有相关数据一起处理至关重要(出于性能或一致性原因)。 例如,在时间窗口平均计算示例中,重要的是来自任何给定传感器的所有测量值都由同一应用程序实例处理。
要设置分区处理方案,必须同时配置数据生成端和数据消费端。 |
编程模型
要了解编程模型,您应该熟悉以下核心概念:
-
目标活页夹:负责提供与外部消息传递系统集成的组件。
-
绑定:外部消息传递系统与应用程序提供的消息生产者和消费者之间的桥接器(由目标绑定器创建)。
-
消息:生产者和使用者用于与目标绑定器(以及通过外部消息传递系统的其他应用程序)通信的规范数据结构。

目标活页夹
目标绑定器是 Spring Cloud Stream 的扩展组件,负责提供必要的配置和实现,以方便 与外部消息系统集成。 此集成负责连接、委派以及与生产者和消费者之间的消息路由、数据类型转换、 调用用户代码等。
活页夹处理了许多样板职责,否则这些职责将落在您的肩上。然而,要实现这一目标,活页夹仍然需要 有些帮助以用户提供的简约但所需的指令集的形式提供,这些指令通常以某种类型的绑定配置的形式出现。
虽然讨论所有可用的活页夹和绑定配置选项超出了本节的讨论范围(手册的其余部分广泛介绍了它们),但绑定作为一个概念确实需要特别注意。下一节将详细讨论它。
绑定
如前所述,绑定在外部消息传递系统(例如队列、主题等)和应用程序提供的生产者和消费者之间架起了一座桥梁。
以下示例显示了一个完全配置且正常运行的 Spring Cloud Stream 应用程序,该应用程序接收消息的有效负载
作为String
类型(请参阅内容类型协商部分),将其记录到控制台,并在转换为大写后向下游发送。
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> {
System.out.println("Received: " + value);
return value.toUpperCase();
};
}
}
上面的示例看起来与任何普通的 spring-boot 应用程序没有什么不同。它定义了类型为Function
就是这样。那么,它是如何成为 spring-cloud-stream 应用程序的呢?
它之所以成为 spring-cloud-stream 应用程序,仅仅是因为存在 spring-cloud-stream 和 binder 依赖项
以及类路径上的自动配置类,有效地将引导应用程序的上下文设置为 spring-cloud-stream 应用程序。
在这种情况下,类型为Supplier
,Function
或Consumer
被视为事实上的消息处理程序触发
绑定到提供的活页夹公开的目标,遵循某些命名约定,以及
规则以避免额外的配置。
绑定和绑定名称
绑定是一种抽象,它表示绑定器和用户代码公开的源和目标之间的桥梁, 这个抽象有一个名称,虽然我们尽力限制运行 spring-cloud-stream 应用程序所需的配置, 对于需要额外的每个绑定配置的情况,必须了解此类名称 () 。
在本手册中,您将看到配置属性的示例,例如spring.cloud.stream.bindings.input.destination=myQueue
.
这input
segment 是我们所说的绑定名称,它可以通过多种机制派生。
以下小节将描述 spring-cloud-stream 用于控制绑定名称的命名约定和配置元素。
如果您的绑定名称具有特殊字符,例如. 字符,您需要用括号 () 将绑定键括起来,然后将其包裹在 qoutes 中。
例如[] spring.cloud.stream.bindings."[my.output.binding.key]".destination . |
功能绑定名称
与以前版本的 spring-cloud-stream 中使用的基于注释的支持(旧版)所需的显式命名不同,函数式 编程模型在绑定名称方面默认使用简单的约定,从而大大简化了应用程序配置。 让我们看第一个例子:
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
在前面的示例中,我们有一个应用程序,其中包含一个充当消息处理程序的函数。作为Function
它有一个
输入和输出。
用于命名输入和输出绑定的命名约定如下:
-
输入-
<functionName> + -in- + <index>
-
输出-
<functionName> + -out- + <index>
这in
和out
对应于绑定的类型(例如输入或输出)。
这index
是输入或输出绑定的索引。对于典型的单输入/输出功能,它始终为 0,
因此,它仅与具有多个输入和输出参数的函数相关。
因此,例如,如果您想将此函数的输入映射到远程 destination(例如,topic、queue 等)称为“my-topic”,您可以使用以下属性执行此作:
--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic
请注意如何uppercase-in-0
用作属性名称中的段。同样的道理uppercase-out-0
.
描述性绑定名称
有时,为了提高可读性,你可能希望给绑定一个更具描述性的名称(例如“帐户”、“订单”等)。
另一种看待它的方式是,您可以将隐式绑定名称映射到显式绑定名称。你可以用spring.cloud.stream.function.bindings.<binding-name>
财产。
此属性还为依赖于基于自定义接口的现有应用程序提供迁移路径
需要显式名称的绑定。
例如
--spring.cloud.stream.function.bindings.uppercase-in-0=input
在前面的示例中,您映射并有效地重命名了uppercase-in-0
将名称绑定到input
.现在所有配置
属性可以引用input
binding name 代替(例如--spring.cloud.stream.bindings.input.destination=my-topic
).
虽然描述性绑定名称可以增强配置的可读性,但它们也会创建
通过将隐式绑定名称映射到显式绑定名称,这是另一个级别的误导。并且由于所有随后的
配置属性将使用显式绑定名称,您必须始终引用此“bindings”属性
关联它实际对应的函数。我们认为,对于大多数情况(功能成分除外)
这可能有点矫枉过正,因此,我们建议完全避免使用它,尤其是
由于不使用它,因此在 Binder Destination 和 Binding Name 之间提供了清晰的路径,例如spring.cloud.stream.bindings.uppercase-in-0.destination=sample-topic ,
其中您清楚地关联了uppercase function 设置为sample-topic 目的地。 |
有关属性和其他配置选项的更多信息,请参阅配置选项部分。
显式绑定创建
在上一节中,我们解释了如何由Function
,Supplier
或Consumer
由您的应用程序提供的 bean。但是,有时您可能需要显式创建绑定,其中绑定未绑定到任何函数。这通常是为了通过支持与其他框架的集成StreamBridge
.
Spring Cloud Stream 允许您通过以下方式显式定义输入和输出绑定spring.cloud.stream.input-bindings
和spring.cloud.stream.output-bindings
性能。 注意到属性名称中的复数允许您通过简单地用作分隔符来定义多个绑定。只需查看以下测试用例作为示例:;
@Test public void testExplicitBindings() { try (ConfigurableApplicationContext context = new SpringApplicationBuilder( TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class)) .web(WebApplicationType.NONE) .run("--spring.jmx.enabled=false", "--spring.cloud.stream.input-bindings=fooin;barin", "--spring.cloud.stream.output-bindings=fooout;barout")) { . . . } } @EnableAutoConfiguration @Configuration public static class EmptyConfiguration { }
如您所见,我们声明了两个输入绑定和两个输出绑定,而我们的配置没有定义任何函数,但我们能够成功创建这些绑定并访问它们相应的通道。
生成和使用消息
您可以通过简单地编写函数并将它们公开为@Bean
s. 您还可以使用基于 Spring Integration 注解的配置或基于 Spring Cloud Stream 注解的配置,尽管从 spring-cloud-stream 3.x 开始我们建议使用功能实现。
Spring Cloud 函数支持
概述
从 Spring Cloud Stream v2.1 开始,定义流处理程序和源的另一种替代方法是使用内置的支持 Spring Cloud Function,其中它们可以表示为 类型java.util.function.[Supplier/Function/Consumer]
.
要指定要绑定到绑定公开的外部目标的功能 Bean,
您必须提供spring.cloud.function.definition
财产。
如果您只有java.util.function.[Supplier/Function/Consumer] 您可以
跳过spring.cloud.function.definition 属性,因为这样的功能 bean 将被自动发现。然而
使用此类属性以避免任何混淆被认为是最佳实践。
有时这种自动发现可能会妨碍,因为类型为java.util.function.[Supplier/Function/Consumer] 除了处理消息之外,还可以用于其他目的,但它是自动发现和自动绑定的。
对于这些罕见的情况,可以通过提供spring.cloud.stream.function.autodetect 值设置为false . |
下面是应用程序将消息处理程序公开为java.util.function.Function
通过充当数据的使用者和生产者来有效支持直通语义。
@SpringBootApplication
public class MyFunctionBootApp {
public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class);
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}
在前面的示例中,我们定义了一个类型为java.util.function.Function
调用 toUpperCase 充当消息处理程序
其“输入”和“输出”必须绑定到提供的目标绑定器公开的外部目标。
默认情况下,“input”和“output”绑定名称将为toUpperCase-in-0
和toUpperCase-out-0
. 有关用于建立绑定名称的命名约定的详细信息,请参阅功能绑定名称部分。
以下是支持其他语义的简单函数式应用程序的示例:
下面是公开为java.util.function.Supplier
@SpringBootApplication
public static class SourceFromSupplier {
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
下面是公开为java.util.function.Consumer
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
提供商(来源)
Function
和Consumer
在如何触发它们的调用时非常简单。它们是基于
在发送到它们绑定到的目的地的数据(事件)上。换句话说,它们是经典的事件驱动组件。
然而Supplier
在触发方面属于自己的类别。由于根据定义,它是数据的源(源),因此它不会订阅任何入站目的地,因此必须由其他一些机制触发。还有一个问题Supplier
实施,这可能是势在必行的,也可能是被动的,并且与触发此类提供商直接相关。
请考虑以下示例:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
前面的Supplier
bean 每当其get()
方法被调用。但是,谁调用此方法以及多久调用一次?该框架提供了一个默认的轮询机制(回答“谁”的问题),该机制将触发提供商的调用,默认情况下它会这样做每秒(回答“多久一次?换句话说,上述配置每秒生成一条消息,每条消息都发送到output
由绑定程序公开的目标。若要了解如何自定义轮询机制,请参阅轮询配置属性部分。
考虑一个不同的例子:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
前面的Supplier
bean 采用响应式编程风格。通常,与命令式提供商不同,它应该只触发一次,因为调用它的get()
方法生成(提供)连续的消息流,而不是单个消息。
该框架识别编程风格的差异,并保证此类提供商仅被触发一次。
但是,想象一下您想要轮询某些数据源并返回表示结果集的有限数据流的用例。 响应式编程风格是这样的提供商的完美机制。然而,鉴于产生的流的有限性, 此类提供商仍需要定期调用。
请考虑以下示例,该示例通过生成有限数据流来模拟此类用例:
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
bean 本身用PollableBean
注释(@Bean
),从而向框架发出信号,尽管实现
对于这样的提供商是被动的,它仍然需要进行轮询。
有一个splittable 属性定义在PollableBean 向此注释的后处理器发出信号
必须拆分注释组件生成的结果,并将其设置为true 默认情况下。这意味着
框架会将发送每个项目的返回拆分为单独的消息。如果不是
他想要的行为你可以将其设置为false 此时,该提供商将简单地返回
产生的通量而不分裂它。 |
提供商和螺纹加工
正如您现在所了解的那样,与Function 和Consumer ,由事件触发(它们有输入数据),Supplier 没有
任何输入,因此由不同的机制触发 - 轮询器,它可能具有不可预测的线程机制。虽然
线程机制大多数时候与函数的下游执行无关,在某些情况下可能会出现问题
特别是对于可能对线程亲和力有一定期望的集成框架。例如,Spring Cloud Sleuth 依赖于
跟踪存储在线程本地的数据。
对于这些情况,我们通过StreamBridge ,用户可以更好地控制线程机制。您可以获得更多详细信息
在将任意数据发送到输出(例如外部事件驱动源)部分。 |
消费者(响应式)
反应性的Consumer
有点特别,因为它有一个 void 返回类型,让框架没有引用可以订阅。
很可能你不需要写Consumer<Flux<?>>
,而是将其写为Function<Flux<?>, Mono<Void>>
调用then
运算符作为流上的最后一个运算符。
例如:
public Function<Flux<?>, Mono<Void>> consumer() {
return flux -> flux.map(..).filter(..).then();
}
但是,如果您确实需要编写一个显式的Consumer<Flux<?>>
,记得订阅传入的通量。
此外,请记住,在混合响应式函数和命令式函数时,相同的规则也适用于函数组合。
Spring Cloud Function 确实支持使用命令式组合响应式函数,但是您必须注意某些限制。
例如,假设您已经使用命令式消费者组成了响应式函数。
这种组合的结果是反应性Consumer
.但是,无法订阅本节前面讨论的此类消费者,
因此,只能通过使您的消费者响应式并手动订阅(如前所述)或将您的函数更改为命令式来解决此限制。
轮询配置属性
以下属性由 Spring Cloud Stream 公开,并以spring.integration.poller.
:
- 固定延迟
-
修复了默认轮询器的延迟(以毫秒为单位)。
默认值:1000L。
- 最大消息PerPoll
-
默认轮询程序的每个轮询事件的最大消息数。
默认值:1L。
- 克罗恩
-
Cron 表达式值。
默认值:无。
- 初始延迟
-
定期触发器的初始延迟。
默认值:0。
- 时间单位
-
要应用于延迟值的 TimeUnit。
默认值:MILLISECONDS。
例如--spring.integration.poller.fixed-delay=2000
将轮询器间隔设置为每两秒轮询一次。
按绑定轮询配置
上一节介绍了如何配置将应用于所有绑定的单个默认轮询器。虽然它非常适合微服务 spring-cloud-stream 模型,该模型专为每个微服务代表单个组件(例如提供商)而设计,因此默认轮询器配置就足够了,但在某些情况下, 您可能有多个组件需要不同的轮询配置
对于这种情况,请使用按绑定的方式配置轮询器。例如,假设您有一个输出绑定supply-out-0
.在这种情况下,您可以为此类
绑定使用spring.cloud.stream.bindings.supply-out-0.producer.poller..
前缀(例如spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000
).
将任意数据发送到输出(例如外部事件驱动源)
在某些情况下,实际的数据源可能来自不是活页夹的外部(外部)系统。例如, 数据源可能是经典的 REST 端点。我们如何将这样的源代码与 spring-cloud-stream 使用的功能机制桥接?
Spring Cloud Stream 提供了两种机制,让我们更详细地了解它们
在这里,对于这两个示例,我们将使用名为delegateToSupplier
绑定到根 Web 上下文,
通过 StreamBridge 机制委托传入请求进行流式传输。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
在这里,我们自动连接一个StreamBridge
bean 的 bean,它允许我们有效地将数据发送到输出绑定
将非流应用程序与 spring-cloud-stream 桥接。请注意,前面的示例没有任何
定义的源函数(例如,Supplier bean)使框架没有触发器来提前创建源绑定,这在配置包含函数 bean 的情况下是典型的。这很好,因为StreamBridge
将启动输出绑定的创建(以及
目标自动配置(如有必要)用于首次调用其时不存在的绑定send(..)
作缓存它
后续重用(有关更多详细信息,请参阅 StreamBridge 和动态目标)。
但是,如果您想在初始化(启动)时预先创建输出绑定,则可以从spring.cloud.stream.output-bindings
属性,您可以在其中声明源的名称。
提供的名称将用作创建源绑定的触发器。
您可以使用来表示多个源(多个输出绑定)
(例如,;
--spring.cloud.stream.output-bindings=foo;bar
)
另外,请注意streamBridge.send(..)
方法采用Object
对于数据。这意味着您可以发送 POJO 或Message
到它和它
在发送输出时将经历相同的例程,就好像它来自提供相同级别的任何函数或提供商一样
与功能一样的一致性。这意味着输出类型转换、分区等被尊重,就好像它来自函数产生的输出一样。
StreamBridge 和动态目的地
StreamBridge
也可用于与用例类似的输出目的地提前未知的情况
在路由 FROM 使用者部分中进行了描述。
让我们看一下示例
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, args);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("myDestination", body);
}
}
如您所见,前面的示例与上一个示例非常相似,只是通过spring.cloud.stream.output-bindings
属性(未提供)。
在这里,我们将数据发送到myDestination
name 不作为绑定存在。因此,此类名称将被视为动态目的地
如路由 FROM 使用者部分所述。
在前面的示例中,我们使用ApplicationRunner
作为外来来源来喂养溪流。
一个更实际的例子,其中外部源是 REST 端点。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
streamBridge.send("myBinding", body);
}
}
正如您在内部看到的delegateToSupplier
我们使用 StreamBridge 将数据发送到myBinding
捆绑。在这里,您还受益于
的动态特征StreamBridge
其中,如果myBinding
不存在,它将自动创建并缓存,否则将使用现有绑定。
如果存在许多动态目标,缓存动态目标(绑定)可能会导致内存泄漏。拥有一定程度的控制权
我们为默认缓存大小为 10 的输出绑定提供了一种自我逐出缓存机制。这意味着,如果动态目标大小超过该数字,则可能会逐出现有绑定,因此需要重新创建,这可能会导致轻微的性能下降。您可以通过spring.cloud.stream.dynamic-destination-cache-size 属性将其设置为所需值。 |
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/
通过展示两个示例,我们想强调该方法适用于任何类型的外国来源。
如果您使用的是 Solace PubSub+ 绑定器,则 Spring Cloud Stream 已保留scst_targetDestination 标头(可通过BinderHeaders.TARGET_DESTINATION检索),它允许将消息从其绑定的配置目标重定向到此标头指定的目标目标。这允许 Binder 管理发布到动态目标所需的资源,从而减轻框架的必要性,并避免上一个注释中提到的缓存问题。更多信息在这里。 |
使用 StreamBridge 输出内容类型
如有必要,还可以使用以下方法签名提供特定内容类型public boolean send(String bindingName, Object data, MimeType outputContentType)
.
或者,如果您将数据作为Message
,则其内容类型将得到尊重。
在 StreamBridge 中使用特定活页夹类型
Spring Cloud Stream 支持多个 binder 场景。例如,您可能从 Kafka 接收数据并将其发送到 RabbitMQ。
有关多个绑定器场景的更多信息,请参阅绑定器部分,特别是类路径上的多个绑定器
如果您计划使用 StreamBridge,并且在您的应用程序中配置了多个活页夹,您还必须告诉 StreamBridge
使用哪种活页夹。为此,还有两种变体send
方法:
public boolean send(String bindingName, @Nullable String binderType, Object data)
public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)
如您所见,您可以提供一个额外的论点 -binderType
,告诉 BindingService 在创建动态绑定时使用哪个绑定器。
对于以下情况spring.cloud.stream.output-bindings 属性,或者绑定已在不同的绑定器下创建,则binderType 参数将无效。 |
将信道拦截器与 StreamBridge 结合使用
因为StreamBridge
使用MessageChannel
要建立输出绑定,您可以在通过StreamBridge
.
由应用程序决定在哪些信道拦截器上应用StreamBridge
.
Spring Cloud Stream 不会将检测到的所有通道拦截器注入到StreamBridge
除非它们被注释为@GlobalChannelInterceptor(patterns = "*")
.
让我们假设您有以下两个不同的StreamBridge
绑定。
streamBridge.send("foo-out-0", message);
和
streamBridge.send("bar-out-0", message);
现在,如果您希望在两个StreamBridge
bindings,则可以声明以下内容GlobalChannelInterceptor
豆。
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
但是,如果您不喜欢上述全局方法,并且希望为每个绑定配备一个专用的拦截器,那么您可以执行以下作。
@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
和
@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
您可以灵活地使模式更加严格或根据您的业务需求进行定制。
通过这种方法,应用程序能够决定要注入哪些拦截器StreamBridge
而不是应用所有可用的拦截器。
StreamBridge 通过StreamOperations 包含所有send 方法StreamBridge .因此,应用程序可以选择使用StreamOperations .当涉及到StreamBridge 通过为StreamOperations 接口。 |
响应式函数支持
由于 Spring Cloud Function 是构建在 Project Reactor 之上的,因此您不需要做太多事情
在实现时从响应式编程模型中受益Supplier
,Function
或Consumer
.
例如:
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
return flux -> flux.map(val -> val.toUpperCase());
}
}
在选择响应式或命令式编程模型时,必须了解一些重要的事情。 完全响应式还是仅 API? 使用响应式 API 并不一定意味着您可以从此类 API 的所有响应式功能中受益。换句话说,背压和其他高级功能之类的东西只有在与兼容系统(例如响应式 Kafka 绑定器)一起使用时才有效。如果您使用的是常规 Kafka 或 Rabbit 或任何其他非响应式绑定器,您只能受益于响应式 API 本身的便利性,而不能从其高级功能中受益,因为流的实际源或目标不是响应式的。 错误处理和重试 在本手册中,您将看到一些关于基于框架的错误处理、重试和其他功能以及与之相关的配置属性的参考资料。重要的是要了解它们只影响命令式函数,当涉及到响应式函数时,您不应该有相同的期望。这就是原因......
响应式函数和命令式函数之间存在根本区别。
命令式函数是一个消息处理程序,由框架在收到的每条消息上调用。因此,对于 N 条消息,将有 N 次此类函数的调用,因此我们可以包装此类函数并添加其他功能,例如错误处理、重试等。
响应式函数是初始化函数。它仅调用一次以获取对用户提供的 Flux/Mono 的引用,以与框架提供的 Flux/Mono 连接。在那之后,我们(框架)对流完全没有可见性或控制权。
因此,对于响应式函数,在错误处理和重试(即 |
功能组成
使用函数式编程模型,您还可以从函数组合中受益,其中您可以从一组简单函数动态组合复杂的处理程序。 例如,让我们将以下函数 bean 添加到上面定义的应用程序中
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}
并修改spring.cloud.function.definition
属性来反映您从 'toUpperCase' 和 'wrapInQuotes' 编写新函数的意图。
为此,Spring Cloud Function 依赖于|
(管道)符号。因此,为了完成我们的示例,我们的属性现在将如下所示:
--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function 提供的函数组合支持的一大好处是事实上,您可以编写响应式函数和命令式函数。 |
组合的结果是一个函数,正如您可能猜到的那样,它可以有一个非常长且相当神秘的名称(例如,foo|bar|baz|xyz. . .
) 当涉及到其他配置属性时,这带来了很大的不便。这就是功能绑定名称部分中描述的描述性绑定名称功能可以提供帮助的地方。
例如,如果我们想给我们的toUpperCase|wrapInQuotes
我们可以这样做的更具描述性的名称
具有以下属性spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput
允许
引用该绑定名称的其他配置属性(例如spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination
).
功能组成和跨领域关注点
函数组合有效地允许您通过分解复杂性来解决复杂性 到一组简单且可单独管理/可测试的组件,这些组件仍然可以 在运行时表示为 1。但这并不是唯一的好处。
您还可以使用组合来解决某些横切的非功能性问题, 例如内容丰富。例如,假设您有一条传入消息,该消息可能 缺少某些标题,或者某些标题不处于您的业务的确切状态 函数会期望。您现在可以实现一个单独的函数来解决这些问题 关注点,然后将其与主要业务功能组合在一起。
让我们看一下示例
@SpringBootApplication
public class DemoStreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemoStreamApplication.class,
"--spring.cloud.function.definition=enrich|echo",
"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
"--spring.cloud.stream.bindings.input.destination=myDestination",
"--spring.cloud.stream.bindings.input.group=myGroup");
}
@Bean
public Function<Message<String>, Message<String>> enrich() {
return message -> {
Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
};
}
@Bean
public Function<Message<String>, Message<String>> echo() {
return message -> {
Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
System.out.println("Incoming message " + message);
return message;
};
}
}
虽然微不足道,但此示例演示了一个函数如何使用附加标头(非功能性问题)来丰富传入的消息,
所以另一个函数 -echo
- 可以从中受益。这echo
函数保持干净,仅专注于业务逻辑。
您还可以查看spring.cloud.stream.function.bindings
属性来简化组合绑定名称。
具有多个输入和输出参数的函数
从 3.0 版开始,spring-cloud-stream 支持以下函数: 具有多个输入和/或多个输出(返回值)。这实际上意味着什么以及 它针对什么类型的用例?
-
大数据:想象一下您正在处理的数据源高度无组织,并且包含各种类型的数据元素 (例如,订单、交易等),您需要有效地对其进行整理。
-
数据聚合:另一个用例可能需要您合并来自 2+ 传入_streams的数据元素。
以上仅描述了几个用例,在这些用例中,您可能需要使用单个函数来接受和/或生成 多个数据流。这就是我们在这里针对的用例类型。
另外,请注意此处对流概念的强调略有不同。假设此类函数仅有价值
如果他们被授予对实际数据流(而不是单个元素)的访问权限。因此,我们依赖于
Project Reactor 提供的抽象(即Flux
和Mono
) 上已经在
classpath 作为 spring-cloud-functions 引入的依赖项的一部分。
另一个重要方面是多个输入和输出的表示。虽然 java 提供了
各种不同的抽象来表示这些抽象的多个
是 a) 无界的,b) 缺乏 arity 和 c) 缺乏类型信息,这些信息在这种情况下都很重要。
举个例子,让我们看看Collection
或者一个数组,它只允许我们
描述单个类型的多个或将所有内容上调到Object
,影响的透明类型转换功能
spring-cloud-stream 等。
因此,为了满足所有这些要求,初始支持依赖于使用另一个抽象的签名 由 Project Reactor - Tuples 提供。但是,我们正在努力允许更灵活的签名。
请参阅绑定和绑定名称部分,了解用于建立此类应用程序使用的绑定名称的命名约定。 |
让我们看几个示例:
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return tuple -> {
Flux<String> stringStream = tuple.getT1();
Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
};
}
}
上面的示例演示了接受两个输入(第一个类型String
和第二种类型Integer
)
并生成类型的单个输出String
.
因此,对于上面的示例,两个输入绑定将是gather-in-0
和gather-in-1
为了保持一致性
输出绑定也遵循相同的约定,并命名为gather-out-0
.
了解这一点将允许您设置绑定特定属性。
例如,以下内容将覆盖gather-in-0
捆绑:
--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
}
上面的例子与前面的示例有点相反,并演示了函数
接受类型的单个输入Integer
并产生两个输出(均为String
).
因此,对于上面的示例,输入绑定为scatter-in-0
和
输出绑定是scatter-out-0
和scatter-out-1
.
你用以下代码对其进行测试:
@Test
public void testSingleInputMultiOutput() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleApplication.class))
.run("--spring.cloud.function.definition=scatter")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
for (int i = 0; i < 10; i++) {
inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
}
int counter = 0;
for (int i = 0; i < 5; i++) {
Message<byte[]> even = outputDestination.receive(0, 0);
assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
Message<byte[]> odd = outputDestination.receive(0, 1);
assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
}
}
}
单个应用程序中的多种功能
可能还需要在单个应用程序中对多个消息处理程序进行分组。您可以通过以下方式做到这一点 定义多个功能。
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
在上面的示例中,我们有定义两个函数的配置uppercase
和reverse
.
因此,首先,如前所述,我们需要注意存在冲突(多个函数),因此
我们需要通过提供spring.cloud.function.definition
指向实际函数的属性
我们想绑定。除了这里我们将使用分隔符来指向这两个函数(请参阅下面的测试用例)。;
与具有多个输入/输出的函数一样,请参阅绑定和绑定名称部分以了解命名 用于建立此类应用程序使用的绑定名称的约定。 |
你用以下代码对其进行测试:
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ReactiveFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-1");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
批处理使用者
使用MessageChannelBinder
支持批量监听器,并且为消费者绑定启用了该功能,您可以将spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
自true
以启用
要传递给函数的整批消息List
.
@Bean
public Function<List<Person>, Person> findFirstPerson() {
return persons -> persons.get(0);
}
批量生产者
您还可以通过返回 Messages 的集合来在生产者端使用批处理的概念,该集合有效地提供了 相反效应,其中集合中的每条消息都将由活页夹单独发送。
考虑以下函数:
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
返回列表中的每条消息将单独发送,从而将四条消息发送到输出目标。
Spring Integration 流作为函数
实现函数时,您可能有适合该类别的复杂需求 企业集成模式 (EIP)。最好使用 框架,例如 Spring Integration(SI),它是 EIP 的参考实现。
值得庆幸的是,SI 已经支持通过集成流作为网关将集成流公开为函数考虑以下示例:
@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
}
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(MessageFunction.class, "uppercase")
.<String, String>transform(String::toUpperCase)
.logAndReply(LoggingHandler.Level.WARN);
}
public interface MessageFunction extends Function<Message<String>, Message<String>> {
}
}
对于熟悉 SI 的人,您可以看到我们定义了一个类型为IntegrationFlow
我们在哪里
将我们要公开为的集成流声明为Function<String, String>
(使用 SI DSL)调用uppercase
.
这MessageFunction
接口允许我们显式声明输入和输出的类型,以便进行适当的类型转换。
有关类型转换的更多信息,请参阅内容类型协商部分。
要接收原始输入,您可以使用from(Function.class, …)
.
生成的函数绑定到目标绑定器公开的输入和输出目标。
请参阅绑定和绑定名称部分以了解命名 用于建立此类应用程序使用的绑定名称的约定。 |
有关 Spring Integration 和 Spring Cloud Stream 互作性的更多详细信息,特别是围绕函数式编程模型 您可能会发现这篇文章非常有趣,因为它深入探讨了 通过合并 Spring Integration 和 Spring Cloud Stream/Functions 的优点来应用各种模式。
使用轮询的消费者
概述
使用轮询的消费者时,轮询PollableMessageSource
按需。
要为轮询的消费者定义绑定,您需要提供spring.cloud.stream.pollable-source
财产。
请考虑以下轮询使用者绑定的示例:
--spring.cloud.stream.pollable-source=myDestination
可轮询源名称myDestination
在前面的示例中,将导致myDestination-in-0
保留的绑定名称
与函数式编程模型一致。
给定前面示例中轮询的使用者,您可以按如下方式使用它:
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition()) {
try {
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}
一种不那么手动且更像 Spring 的替代方法是配置一个计划任务 bean。例如
@Scheduled(fixedDelay = 5_000)
public void poll() {
System.out.println("Polling...");
this.source.poll(m -> {
System.out.println(m.getPayload());
}, new ParameterizedTypeReference<Foo>() { });
}
这PollableMessageSource.poll()
方法采用MessageHandler
参数(通常是 lambda 表达式,如下所示)。它返回true
如果收到并成功处理了消息。
与消息驱动的消费者一样,如果MessageHandler
抛出异常,消息发布到错误通道,如Error Handling
.
通常,poll()
方法在MessageHandler
出口。 如果该方法异常退出,则消息将被拒绝(而不是重新排队),但请参阅处理错误。您可以通过对确认负责来覆盖该行为,如以下示例所示:
@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
return args -> {
while (someCondition()) {
if (!dest1In.poll(m -> {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
// e.g. hand off to another thread which can perform the ack
// or acknowledge(Status.REQUEUE)
})) {
Thread.sleep(1000);
}
}
};
}
您必须ack (或nack ) 消息,以避免资源泄漏。 |
某些消息传递系统(如Apache Kafka)在日志中维护一个简单的偏移量。如果传递失败并重新排队StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); ,则重新传递任何后来成功确认的邮件。 |
还有一个过载的poll
方法,其定义如下:
poll(MessageHandler handler, ParameterizedTypeReference<?> type)
这type
是一个转换提示,允许转换传入消息有效负载,如以下示例所示:
boolean result = pollableSource.poll(received -> {
Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
...
}, new ParameterizedTypeReference<Map<String, Foo>>() {});
处理错误
默认情况下,为可轮询源配置错误通道;如果回调抛出异常,则ErrorMessage
被发送到错误通道 (<destination>.<group>.errors
);此错误通道也桥接到全局 Spring IntegrationerrorChannel
.
您可以使用@ServiceActivator
处理错误;如果没有订阅,则错误将被简单地记录下来,并且消息将被确认为成功。
如果错误通道服务激活器引发异常,则该消息将被拒绝(默认),并且不会重新传递。
如果服务激活器抛出RequeueCurrentMessageException
,消息将在代理处重新排队,并在后续轮询中再次检索。
如果监听器抛出RequeueCurrentMessageException
如上所述,消息将直接重新排队,并且不会发送到错误通道。
事件路由
在 Spring Cloud Stream 的上下文中,事件路由是 a) 将事件路由到特定事件订阅者或 b) 将事件订阅者生成的事件路由到特定目的地的能力。 在这里,我们将其称为路由“TO”和路由“FROM”。
路由到消费者
路由可以依靠RoutingFunction
在 Spring Cloud Function 3.0 中可用。您需要做的就是通过--spring.cloud.stream.function.routing.enabled=true
应用程序属性或提供spring.cloud.function.routing-expression
财产。
启用后RoutingFunction
将绑定到输入目的地
接收所有消息并根据提供的指令将它们路由到其他功能。
为了绑定,路由目标的名称是functionRouter-in-0 (请参阅RoutingFunction.FUNCTION_NAME和绑定命名约定功能绑定名称)。 |
可以使用单个消息以及应用程序属性提供指令。
下面是几个示例:
使用邮件头
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class,
"--spring.cloud.stream.function.routing.enabled=true");
}
@Bean
public Consumer<String> even() {
return value -> {
System.out.println("EVEN: " + value);
};
}
@Bean
public Consumer<String> odd() {
return value -> {
System.out.println("ODD: " + value);
};
}
}
通过向functionRouter-in-0
绑定器公开的目的地(即 rabbit、kafka),
此类消息将路由到适当的(“偶数”或“奇数”)使用者。
默认情况下RoutingFunction
会寻找一个spring.cloud.function.definition
或spring.cloud.function.routing-expression
(对于使用 SpEL 的更动态场景)
标头,如果找到,其值将被视为路由指令。
例如
设置spring.cloud.function.routing-expression
header 到值T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'
最终将半随机地将请求路由到任一odd
或even
功能。
此外,对于 SpEL,求值上下文的根对象是Message
因此,您也可以对单个标头(或消息)进行评估….routing-expression=headers['type']
使用应用程序属性
这spring.cloud.function.routing-expression
和/或spring.cloud.function.definition
可以作为应用程序属性传递(例如spring.cloud.function.routing-expression=headers['type']
.
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
通过应用程序属性传递指令对于响应式函数尤为重要,因为响应式函数 函数仅调用一次以传递发布者,因此对单个项的访问受到限制。 |
路由功能和输出绑定
RoutingFunction
是一个Function
因此,对待方式与任何其他职能没有什么不同。井。。。几乎。
什么时候RoutingFunction
到另一个路由Function
,其输出被发送到RoutingFunction
哪
是functionRouter-in-0
不出所料。但如果RoutingFunction
路由到Consumer
?换句话说,调用的结果
的RoutingFunction
可能不会产生任何要发送到输出绑定的内容,因此甚至有必要有一个。
所以,我们确实对待RoutingFunction
当我们创建绑定时,情况略有不同。即使它对您作为用户是透明的
(你真的没什么可做的),了解一些机制将有助于你理解它的内部运作方式。
所以,规则是;
我们从不为RoutingFunction
,仅输入。因此,当您路由到Consumer
这RoutingFunction
有效
变成Consumer
通过没有任何输出绑定。但是,如果RoutingFunction
碰巧路由到另一个Function
产生
输出,输出绑定RoutingFunction
将动态创建,此时RoutingFunction
将作为常规Function
关于绑定(同时具有输入和输出绑定)。
路由 FROM 消费者
除了静态目标之外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目标。 例如,当需要在运行时确定目标目标时,这很有用。 应用程序可以通过以下两种方式之一执行此作。
spring.cloud.stream.sendto.destination
您还可以委托给框架,通过指定spring.cloud.stream.sendto.destination
页眉
设置为要解析的目标的名称。
请考虑以下示例:
@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {
@Bean
public Function<String, Message<String>> destinationAsPayload() {
return value -> {
return MessageBuilder.withPayload(value)
.setHeader("spring.cloud.stream.sendto.destination", value).build();};
}
}
尽管在这个例子中你可以清楚地看到微不足道,但我们的输出是一个带有spring.cloud.stream.sendto.destination
页眉
设置为 he 输入参数的值。框架将查阅此标头,并尝试创建或发现
具有该名称的目标,并向其发送输出。
如果预先知道目标名称,则可以像配置任何其他目标一样配置生产者属性。
或者,如果您注册了NewDestinationBindingCallback<>
bean,则在创建绑定之前调用它。
回调采用绑定器使用的扩展生产者属性的泛型类型。
它有一个方法:
void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
以下示例显示如何使用 RabbitMQ 绑定器:
@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
如果您需要支持具有多种活页夹类型的动态目标,请使用Object 对于泛型类型,并将extended 参数。 |
另外,请参阅[使用StreamBridge]部分,了解如何在类似情况下使用另一个选项(StreamBridge)。
后期处理(发送消息后)
调用函数后,其结果将由框架发送到目标目的地,从而有效地完成函数调用周期。
但是,从业务角度来看,这样的周期可能要等到完成该周期后执行一些额外的任务才能完全完成。
虽然这可以通过简单的组合来实现Consumer
和StreamBridge
正如这篇 Stack Overflow 帖子中所述,从 4.0.3 版本开始,框架
提供了一种更惯用的方法来解决这个问题,方法是PostProcessingFunction
由 Spring Cloud Function 项目提供。
这PostProcessingFunction
是一个特殊的半标记函数,其中包含一个附加方法postProcess(Message>)
设计
为实现此类后处理任务提供场所。
package org.springframework.cloud.function.context . . . public interface PostProcessingFunction<I, O> extends Function<I, O> { default void postProcess(Message<O> result) { } }
所以,现在你有两个选择。
选项 1:您可以将函数实现为PostProcessingFunction
并通过实现其postProcess(Message>)
方法。
private static class Uppercase implements PostProcessingFunction<String, String> { @Override public String apply(String input) { return input.toUpperCase(); } @Override public void postProcess(Message<String> result) { System.out.println("Function Uppercase has been successfully invoked and its result successfully sent to target destination"); } } . . . @Bean public Function<String, String> uppercase() { return new Uppercase(); }
选项 2:如果您已经有一个现有函数并且不想更改其实现或希望将函数保留为 POJO,您可以简单地仅实现postProcess(Message>)
方法,并将这个新的后处理函数与其他函数组合在一起。
private static class Logger implements PostProcessingFunction<?, String> { @Override public void postProcess(Message<String> result) { System.out.println("Function has been successfully invoked and its result successfully sent to target destination"); } } . . . @Bean public Function<String, String> uppercase() { return v -> v.toUpperCase(); } @Bean public Function<String, String> logger() { return new Logger(); } . . . // and then have your function definition as such `uppercase|logger`
注意:
在函数组合的情况下,只有PostProcessingFunction
(如果存在)将生效。例如,假设您有
以下函数定义 -foo|bar|baz
以及两者foo
和baz
是PostProcessingFunction
.只baz.postProcess(Message>)
将被调用。
如果baz
不是PostProcessingFunction
,则不会执行后处理功能。
有人可能会争辩说,您可以通过简单地将后处理器组合为另一个函数组合来轻松做到这一点Function
.然而,这确实是一种可能性
在这种情况下,后处理功能将在调用上一个函数之后和消息发送到目标目标之前立即调用
这是在函数调用周期完成之前。
错误处理
在本节中,我们将解释框架提供的错误处理机制背后的一般思想。 我们将使用 Rabbit 活页夹作为示例,因为各个活页夹定义了不同的集合 特定于底层代理功能(例如 Kafka 绑定器)的某些受支持机制的属性。
错误时有发生,Spring Cloud Stream 提供了几种灵活的机制来处理它们。请注意,这些技术取决于 binder 实现和 底层消息传递中间件的功能以及编程模型(稍后会详细介绍)。
每当消息处理程序(函数)抛出异常时,它都会传播回绑定器,此时绑定器将多次尝试重试
相同的消息(默认为 3)使用RetryTemplate
由 Spring Retry 库提供。
如果重试不成功,则取决于错误处理机制,该机制可能会删除消息、将消息重新排队以重新处理或将失败的消息发送到 DLQ。
Rabbit 和 Kafka 都支持这些概念(尤其是 DLQ)。但是,其他绑定器可能不支持,因此请参阅您各个绑定器的文档,了解有关支持的错误处理选项的详细信息。
但是请记住,响应式函数不符合消息处理程序的条件,因为它不处理单个消息,并且相反,提供了一种将框架提供的流(即 Flux)与用户提供的流连接起来的方法。为什么这很重要?这是因为您在本节后面阅读的有关重试模板、删除失败消息、重试、DLQ 和协助所有这些的配置属性仅适用于消息处理程序(即命令式函数)。
Reactive API 提供了一个非常丰富的自己的运算符和机制库,以帮助您处理特定于
各种响应式用例,它们比简单的消息处理程序情况复杂得多,所以使用它们,例如
如public final Flux<T> retryWhen(Retry retrySpec);
你可以在reactor.core.publisher.Flux
.
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux
.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
.map(v -> v.toUpperCase());
}
丢弃失败的邮件
默认情况下,系统提供错误处理程序。第一个错误处理程序将简单地记录错误消息。第二个错误处理程序是特定于绑定程序的错误处理程序 它负责在特定消息传递系统的上下文中处理错误消息(例如,发送到 DLQ)。但是,由于没有提供其他错误处理配置(在当前情况下),因此此处理程序将不会执行任何作。因此,基本上在记录后,消息将被删除。
虽然在某些情况下是可以接受的,但在大多数情况下,它不是,我们需要一些恢复机制来避免消息丢失。
处理错误消息
在上一节中,我们提到,默认情况下,导致错误的消息会被有效地记录和删除。框架还为你公开了机制
提供自定义错误处理程序(即发送通知或写入数据库等)。您可以通过添加Consumer
专门设计用于接受ErrorMessage
除了有关错误的所有信息(例如,堆栈跟踪等)之外,还包含原始消息(触发错误的消息)。
注意:自定义错误处理程序与框架提供的错误处理程序(即日志记录和绑定程序错误处理程序 - 请参阅上一节)是互斥的,以确保它们不会干扰。
@Bean
public Consumer<ErrorMessage> myErrorHandler() {
return v -> {
// send SMS notification code
};
}
要将此类消费者识别为错误处理程序,您只需提供error-handler-definition
指向函数名称的属性 -spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler
.
例如,对于绑定名称uppercase-in-0
该属性将如下所示:
spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler
如果您使用特殊的映射指令将绑定映射到更易读的名称 -spring.cloud.stream.function.bindings.uppercase-in-0=upper
,则此属性将如下所示:
spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果您意外地将此类处理程序声明为Function ,它仍然可以工作,但不会对其输出执行任何作。但是,鉴于此类处理程序仍然依赖于 Spring Cloud Function 提供的功能,如果您的处理程序具有一些复杂性,您希望通过函数组合解决这些复杂性(尽管可能性不大),您也可以从函数组合中受益。 |
默认错误处理程序
如果要为所有函数 bean 使用单个错误处理程序,则可以使用标准的 spring-cloud-stream 机制来定义默认属性spring.cloud.stream.default.error-handler-definition=myErrorHandler
DLQ - 死信队列
DLQ 可能是最常见的机制,它允许将失败的消息发送到一个特殊的目的地:死信队列。
配置后,失败的消息将发送到此目标,以便后续重新处理或审核和协调。
请考虑以下示例:
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class,
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
"--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
);
}
@Bean
public Function<Person, Person> uppercase() {
return personIn -> {
throw new RuntimeException("intentional");
});
};
}
}
提醒一下,在此示例中uppercase-in-0
segment 对应于输入目标绑定的名称。
这consumer
segment 表示它是消费者属性。
使用 DLQ 时,至少group 必须提供属性才能正确命名 DLQ 目标。然而group 经常一起使用
跟destination 属性,如我们的示例所示。 |
除了一些标准属性之外,我们还设置了auto-bind-dlq
指示活页夹创建和配置 DLQ 目标uppercase-in-0
binding 对应于uppercase
destination(请参阅相应的属性),这会导致一个名为uppercase.myGroup.dlq
(有关 Kafka 特定的 DLQ 属性,请参阅 Kafka 文档)。
配置完成后,所有失败的消息都将路由到此目标,并保留原始消息以供进一步作。
并且可以看到错误消息包含与原始错误相关的更多信息,如下所示:
. . . .
x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is
org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at. . . . .
Payload: blah
您还可以通过设置max-attempts
设置为“1”。例如
--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1
重试模板
在本节中,我们将介绍与重试功能配置相关的配置属性。
这RetryTemplate
是 Spring Retry 库的一部分。
虽然它超出了本文档的范围,但涵盖RetryTemplate
我们
会提到以下具体相关的消费属性
这RetryTemplate
:
- 最大尝试次数
-
尝试处理邮件的次数。
默认值:3。
- backOff初始间隔
-
重试时的回退初始间隔。
默认为 1000 毫秒。
- backOffMax间隔
-
最大回退间隔。
默认为 10000 毫秒。
- backOff乘数
-
退避乘数。
默认 2.0。
- default可重试
-
监听器抛出的异常是否未在
retryableExceptions
是可重试的。违约:
true
. - retryableExceptions
-
键中的 Throwable 类名和值中的布尔值的映射。 指定那些将重试或不会重试的异常(和子类)。 另请参阅
defaultRetriable
. 例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
.默认值:空。
虽然上述设置足以满足大多数自定义要求,但它们可能无法满足某些复杂的要求,在这些要求中,这些要求
Point 你可能想要提供你自己的RetryTemplate
.为此,请在应用程序配置中将其配置为 bean。申请提供的
实例将覆盖框架提供的实例。此外,为了避免冲突,您必须限定RetryTemplate
您希望被活页夹使用
如@StreamRetryTemplate
.例如
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
return new RetryTemplate();
}
从上面的例子中可以看出,你不需要用@Bean
因为@StreamRetryTemplate
是合格的@Bean
.
如果您需要更精确地使用RetryTemplate
,您可以在ConsumerProperties
关联
每个绑定的特定重试 Bean。
spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>
粘合剂
Spring Cloud Stream 提供了一个 Binder 抽象,用于连接到外部中间件的物理目标。 本节提供有关 Binder SPI 背后的主要概念、其主要组件以及特定于实现的详细信息的信息。
生产者和消费者
下图显示了生产者和消费者的一般关系:

生产者是将消息发送到绑定目标的任何组件。
绑定目标可以通过Binder
该代理的实现。
调用bindProducer()
方法,第一个参数是代理中目标的名称,第二个参数是生产者向其发送消息的本地目标实例,第三个参数包含要在为该绑定目标创建的适配器中使用的属性(例如分区键表达式)。
使用者是从绑定目标接收消息的任何组件。
与生产者一样,消费者可以绑定到外部消息代理。
调用bindConsumer()
方法,第一个参数是目标名称,第二个参数提供使用者逻辑组的名称。
由给定目标的使用者绑定表示的每个组都会接收生产者发送到该目标的每条消息的副本(即,它遵循正常的发布-预订语义)。
如果有多个使用者实例绑定了相同的组名称,则消息将在这些使用者实例之间进行负载均衡,以便生产者发送的每条消息仅由每个组中的单个使用者实例使用(即,它遵循正常的排队语义)。
Binder SPI
Binder SPI 由许多接口、开箱即用的实用程序类和发现策略组成,这些策略提供了用于连接到外部中间件的可插拔机制。
SPI 的关键点是Binder
接口,这是一种将输入和输出连接到外部中间件的策略。以下列表显示了Binder
接口:
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String bindingName, String group, T inboundBindTarget, C consumerProperties);
Binding<T> bindProducer(String bindingName, T outboundBindTarget, P producerProperties);
}
该接口是参数化的,提供了许多扩展点:
-
输入和输出绑定目标。
-
扩展的使用者和生产者属性,允许特定的 Binder 实现添加可以以类型安全的方式支持的补充属性。
典型的 Binder 实现包括以下内容:
-
实现
Binder
接口; -
Spring
@Configuration
创建类型为Binder
以及中间件连接基础设施。 -
一个
META-INF/spring.binders
在包含一个或多个 Binder 定义的类路径上找到的文件,如以下示例所示:kafka:\ org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
前面已经提到,Binder 抽象也是框架的扩展点之一。因此,如果您在前面的列表中找不到合适的 binder,您可以在 Spring Cloud Stream 之上实现自己的 binder。
在如何从头开始创建 Spring Cloud Stream Binder 发布社区成员文档中
详细介绍,通过示例,实现自定义活页夹所需的一组步骤。
这些步骤也在Implementing Custom Binders 部分。 |
粘合剂检测
Spring Cloud Stream 依赖于 Binder SPI 的实现来执行将用户代码连接到(绑定)到消息代理的任务。 每个 Binder 实现通常连接到一种类型的消息传递系统。
类路径检测
默认情况下,Spring Cloud Stream 依赖于 Spring Boot 的自动配置来配置绑定过程。 如果在类路径上找到单个 Binder 实现,Spring Cloud Stream 会自动使用它。 例如,一个旨在仅绑定到 RabbitMQ 的 Spring Cloud Stream 项目可以添加以下依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
有关其他 Binder 依赖项的特定 Maven 坐标,请参阅该 Binder 实现的文档。
类路径上的多个绑定器
当类路径上存在多个绑定器时,应用程序必须指示每个目标绑定要使用哪个绑定器。
每个活页夹配置都包含一个META-INF/spring.binders
文件,这是一个简单的属性文件,如以下示例所示:
rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration
其他提供的 Binder 实现(例如 Kafka)也存在类似的文件,并且自定义 Binder 实现也应该提供它们。
键表示绑定器实现的标识名称,而值是以逗号分隔的配置类列表,每个配置类包含一个且只有一个类型的 Bean 定义org.springframework.cloud.stream.binder.Binder
.
活页夹选择可以全局执行,使用spring.cloud.stream.defaultBinder
属性(例如,spring.cloud.stream.defaultBinder=rabbit
)或通过在每个绑定上配置活页夹来单独配置活页夹。
例如,处理器应用程序(具有名为input
和output
分别用于读取和写入),从 Kafka 读取并写入 RabbitMQ 可以指定以下配置:
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit
连接到多个系统
默认情况下,绑定器共享应用程序的 Spring Boot 自动配置,以便在类路径上找到的每个绑定器创建一个实例。 如果应用程序应连接到多个相同类型的代理,则可以指定多个绑定器配置,每个配置具有不同的环境设置。
打开显式活页夹配置会完全禁用默认活页夹配置过程。
如果这样做,则配置中必须包含所有正在使用的活页夹。
打算透明地使用 Spring Cloud Stream 的框架可能会创建可以通过名称引用的 Binder 配置,但它们不会影响默认的 Binder 配置。
为此,活页夹配置可能具有其defaultCandidate 标志设置为 false(例如,spring.cloud.stream.binders.<configurationName>.defaultCandidate=false ).
这表示独立于默认活页夹配置过程而存在的配置。 |
以下示例显示了连接到两个 RabbitMQ 代理实例的处理器应用程序的典型配置:
spring:
cloud:
stream:
bindings:
input:
destination: thing1
binder: rabbit1
output:
destination: thing2
binder: rabbit2
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: <host1>
rabbit2:
type: rabbit
environment:
spring:
rabbitmq:
host: <host2>
这environment 属性也可以用于任何 Spring Boot 属性,
包括这个spring.main.sources 这对于为
特定的绑定器,例如覆盖自动配置的 bean。 |
例如;
environment:
spring:
main:
sources: com.acme.config.MyCustomBinderConfiguration
要为特定活页夹环境激活特定配置文件,您应该使用spring.profiles.active
财产:
environment:
spring:
profiles:
active: myBinderProfile
在多粘合剂应用中定制粘合剂
当应用程序中有多个绑定器并想要自定义绑定器时,可以通过提供BinderCustomizer
实现。
对于具有单个 Binder 的应用程序,不需要此特殊定制器,因为 Binder 上下文可以直接访问自定义 Bean。
但是,在多绑定器方案中并非如此,因为不同的绑定器位于不同的应用程序上下文中。
通过提供BinderCustomizer
接口,绑定器虽然驻留在不同的应用程序上下文中,但将接收自定义。
Spring Cloud Stream 确保在应用程序开始使用绑定器之前进行自定义。
用户必须检查活页夹类型,然后应用必要的自定义项。
下面是一个提供BinderCustomizer
豆。
@Bean
public BinderCustomizer binderCustomizer() {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setRebalanceListener(...);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}
请注意,当有多个相同类型的活页夹实例时,活页夹名称可用于过滤自定义。
绑定可视化和控制
Spring Cloud Stream 支持通过 Actuator 端点以及编程方式对 Bindings 进行可视化和控制。
程序化方式
从 3.1 版本开始,我们公开了org.springframework.cloud.stream.binding.BindingsLifecycleController
注册为 bean 并一次
注入可用于控制单个绑定的生命周期
例如,查看其中一个测试用例中的片段。如您所见,我们检索BindingsLifecycleController
从 Spring 应用程序上下文中执行单个方法来控制echo-in-0
捆绑。。
BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();
驱动器
由于执行器和 Web 是可选的,因此必须首先添加其中一个 Web 依赖项,并手动添加执行器依赖项。以下示例显示如何添加 Web 框架的依赖项:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
以下示例显示如何添加 WebFlux 框架的依赖项:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
您可以按如下方式添加 Actuator 依赖项:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在 Cloud Foundry 中运行 Spring Cloud Stream 2.0 应用程序,您必须将spring-boot-starter-web 和spring-boot-starter-actuator 到类路径。否则,应用程序将由于运行状况检查失败而无法启动。 |
您还必须启用bindings
执行器端点,通过设置以下属性:--management.endpoints.web.exposure.include=bindings
.
满足这些前提条件后。应用程序启动时,您应该在日志中看到以下内容:
: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . . : Mapped "{[/actuator/bindings],methods=[GET]. . . : Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .
要可视化当前绑定,请访问以下 URL:http://<host>:<port>/actuator/bindings
或者,要查看单个绑定,请访问类似于以下内容的 URL 之一:http://<host>:<port>/actuator/bindings/<bindingName>;
您还可以通过发布到同一 URL 来停止、启动、暂停和恢复单个绑定,同时提供state
参数为 JSON,如以下示例所示:
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSED 和RESUMED 仅当相应的绑定器及其底层技术支持它时才有效。否则,您会在日志中看到警告消息。目前,只有 Kafka 和 [Solace](https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) 绑定器支持PAUSED 和RESUMED 国家。 |
活页夹配置属性
自定义活页夹配置时,以下属性可用。这些属性通过org.springframework.cloud.stream.config.BinderProperties
它们必须以spring.cloud.stream.binders.<configurationName>
.
- 类型
-
活页夹类型。 它通常引用在类路径上找到的绑定器之一——特别是
META-INF/spring.binders
文件。默认情况下,它与配置名称具有相同的值。
- inheritEnvironment (继承环境)
-
配置是否继承应用程序本身的环境。
违约:
true
. - 环境
-
Root 表示一组可用于自定义绑定器环境的属性。 设置此属性后,在其中创建活页夹的上下文不是应用程序上下文的子项。 此设置允许粘合剂组分和应用组分之间完全分离。
违约:
empty
. - 默认候选
-
活页夹配置是被视为默认活页夹的候选者,还是只能在显式引用时使用。 此设置允许添加活页夹配置,而不会干扰默认处理。
违约:
true
.
实现自定义活页夹
为了实现自定义Binder
,您所需要的只是:
-
添加所需的依赖项
-
提供 ProvisioningProvider 实现
-
提供 MessageProducer 实现
-
提供 MessageHandler 实现
-
提供 Binder 实现
-
创建活页夹配置
-
在 META-INF/spring.binders 中定义您的 binder
添加所需的依赖项
添加spring-cloud-stream
依赖于您的项目(例如,Maven):
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>${spring.cloud.stream.version}</version>
</dependency>
提供 ProvisioningProvider 实现
这ProvisioningProvider
负责供应使用者和生产者目标,并且需要转换物理目标引用中的 application.yml 或 application.properties 文件中包含的逻辑目标。
下面是 ProvisioningProvider 实现示例,该示例仅修剪通过输入/输出绑定配置提供的目标:
public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
@Override
public ProducerDestination provisionProducerDestination(
final String name,
final ProducerProperties properties) {
return new FileMessageDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(
final String name,
final String group,
final ConsumerProperties properties) {
return new FileMessageDestination(name);
}
private class FileMessageDestination implements ProducerDestination, ConsumerDestination {
private final String destination;
private FileMessageDestination(final String destination) {
this.destination = destination;
}
@Override
public String getName() {
return destination.trim();
}
@Override
public String getNameForPartition(int partition) {
throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
}
}
}
提供 MessageProducer 实现
这MessageProducer
负责使用事件并将其作为消息处理到配置为使用此类事件的客户端应用程序。
下面是 MessageProducer 实现的示例,它扩展了MessageProducerSupport
抽象,以便轮询与修剪后的目标名称匹配且位于项目路径中的文件,同时存档读取消息并丢弃随后的相同消息:
public class FileMessageProducer extends MessageProducerSupport {
public static final String ARCHIVE = "archive.txt";
private final ConsumerDestination destination;
private String previousPayload;
public FileMessageProducer(ConsumerDestination destination) {
this.destination = destination;
}
@Override
public void doStart() {
receive();
}
private void receive() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(() -> {
String payload = getPayload();
if(payload != null) {
Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
archiveMessage(payload);
sendMessage(receivedMessage);
}
}, 0, 50, MILLISECONDS);
}
private String getPayload() {
try {
List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
String currentPayload = allLines.get(allLines.size() - 1);
if(!currentPayload.equals(previousPayload)) {
previousPayload = currentPayload;
return currentPayload;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
private void archiveMessage(String payload) {
try {
Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
在实现自定义绑定器时,此步骤并不是严格强制性的,因为您始终可以求助于使用现有的 MessageProducer 实现! |
提供 MessageHandler 实现
这MessageHandler
提供生成事件所需的逻辑。
以下是 MessageHandler 实现的示例:
public class FileMessageHandler implements MessageHandler{
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//write message to file
}
}
在实现自定义绑定器时,此步骤并不是严格强制性的,因为您始终可以求助于使用现有的 MessageHandler 实现! |
提供 Binder 实现
您现在可以提供自己的Binder
抽象化。这可以通过以下方式轻松完成:
-
扩展
AbstractMessageChannelBinder
类 -
将 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的泛型参数
-
覆盖
createProducerMessageHandler
和createConsumerEndpoint
方法
例如:
public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {
public FileMessageBinder(
String[] headersToEmbed,
FileMessageBinderProvisioner provisioningProvider) {
super(headersToEmbed, provisioningProvider);
}
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
final ProducerProperties producerProperties,
final MessageChannel errorChannel) throws Exception {
return message -> {
String fileName = destination.getName();
String payload = new String((byte[])message.getPayload()) + "\n";
try {
Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
@Override
protected MessageProducer createConsumerEndpoint(
final ConsumerDestination destination,
final String group,
final ConsumerProperties properties) throws Exception {
return new FileMessageProducer(destination);
}
}
创建活页夹配置
严格要求您创建一个 Spring 配置来初始化 Binder 实现的 bean(以及您可能需要的所有其他 bean):
@Configuration
public class FileMessageBinderConfiguration {
@Bean
@ConditionalOnMissingBean
public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
return new FileMessageBinderProvisioner();
}
@Bean
@ConditionalOnMissingBean
public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
return new FileMessageBinder(null, fileMessageBinderProvisioner);
}
}
在 META-INF/spring.binders 中定义您的 binder
最后,您必须在META-INF/spring.binders
文件,同时指定 Binder 的名称和 Binder Configuration 类的完整限定名称:
myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration
配置选项
Spring Cloud Stream 支持常规配置选项以及绑定和绑定器的配置。 某些绑定器允许其他绑定属性支持特定于中间件的功能。
可以通过 Spring Boot 支持的任何机制向 Spring Cloud Stream 应用程序提供配置选项。 这包括应用程序参数、环境变量以及 YAML 或 .properties 文件。
绑定服务属性
这些属性通过org.springframework.cloud.stream.config.BindingServiceProperties
- spring.cloud.stream.instanceCount
-
应用程序的已部署实例数。 必须设置为在生产者端进行分区。使用 RabbitMQ 和 Kafka 时必须在消费者端设置,如果
autoRebalanceEnabled=false
.违约:
1
. - spring.cloud.stream.instance索引
-
应用程序的实例索引:来自
0
自instanceCount - 1
. 用于使用 RabbitMQ 和 Kafka 进行分区,如果autoRebalanceEnabled=false
. 在 Cloud Foundry 中自动设置以匹配应用程序的实例索引。 - spring.cloud.stream.dynamicDestinations
-
可以动态绑定的目标列表(例如,在动态路由方案中)。 如果设置,则只能绑定列出的目的地。
默认值:空(允许绑定任何目的地)。
- spring.cloud.stream.defaultBinder
-
如果配置了多个活页夹,则要使用的默认活页夹。 请参阅类路径上的多个绑定器。
默认值:空。
- spring.cloud.stream.overrideCloudConnectors
-
此属性仅适用于以下情况
cloud
profile 处于活动状态,并且应用程序提供了 Spring Cloud Connectors。 如果属性是false
(默认值),绑定器检测合适的绑定服务(例如,在 Cloud Foundry 中为 RabbitMQ 绑定器的 RabbitMQ 服务)并使用它来创建连接(通常通过 Spring Cloud Connectors)。 当设置为true
,此属性指示 Binder 完全忽略绑定的服务并依赖 Spring Boot 属性(例如,依赖于spring.rabbitmq.*
RabbitMQ 绑定器环境中提供的属性)。 此属性的典型用法是在连接到多个系统时嵌套在自定义环境中。违约:
false
. - spring.cloud.stream.binding重试间隔
-
例如,当绑定程序不支持延迟绑定并且代理(例如 Apache Kafka)关闭时,重试绑定创建之间的间隔(以秒为单位)。将其设置为零以将此类情况视为致命条件,从而阻止应用程序启动。
违约:
30
绑定属性
绑定属性通过使用spring.cloud.stream.bindings.<bindingName>.<property>=<value>
.
这<bindingName>
表示正在配置的绑定的名称。
例如,对于以下函数
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
有两个名为uppercase-in-0
用于输入和uppercase-out-0
用于输出。有关更多详细信息,请参阅绑定和绑定名称。
为了避免重复,Spring Cloud Stream 支持为所有绑定设置值,格式为spring.cloud.stream.default.<property>=<value>
和spring.cloud.stream.default.<producer|consumer>.<property>=<value>
用于常见的绑定属性。
当涉及到避免重复扩展绑定属性时,应使用以下格式 -spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>
.
常见绑定属性
这些属性通过org.springframework.cloud.stream.config.BindingProperties
以下绑定属性可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.
(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock
).
可以使用spring.cloud.stream.default
前缀(例如spring.cloud.stream.default.contentType=application/json
).
- 目的地
-
绑定中间件上绑定的目标目标(例如,RabbitMQ 交换或 Kafka 主题)。 如果绑定表示使用者绑定(输入),则可以将其绑定到多个目标,并且目标名称可以指定为逗号分隔
String
值。 如果没有,则改用实际的绑定名称。 无法覆盖此属性的默认值。 - 群
-
绑定的使用者组。 仅适用于入站绑定。 请参阅消费者组。
违约:
null
(表示匿名消费者)。 - 内容类型
-
此绑定的内容类型。 看
Content Type Negotiation
.违约:
application/json
. - 粘结 剂
-
此绑定使用的活页夹。 看
Multiple Binders on the Classpath
了解详情。违约:
null
(使用默认活页夹(如果存在)。
消费物业
这些属性通过org.springframework.cloud.stream.binder.ConsumerProperties
以下绑定属性仅适用于输入绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.consumer.
(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3
).
可以使用spring.cloud.stream.default.consumer
前缀(例如spring.cloud.stream.default.consumer.headerMode=none
).
- 自动启动
-
是否需要自动启动此使用者的信号
违约:
true
. - 并发
-
入站使用者的并发性。
违约:
1
. - 分区
-
使用者是否从分区生产者接收数据。
违约:
false
. - headerMode
-
当设置为
none
,禁用输入上的标头解析。 仅对本机不支持消息头且需要嵌入标头的消息传递中间件有效。 当不支持本机标头时,从非 Spring Cloud Stream 应用程序使用数据时,此选项非常有用。 当设置为headers
,它使用中间件的原生标头机制。 当设置为embeddedHeaders
,它将标头嵌入到消息有效负载中。默认值:取决于活页夹实现。
- 最大尝试次数
-
如果处理失败,则尝试处理消息的次数(包括第一次)。 设置为
1
以禁用重试。违约:
3
. - backOff初始间隔
-
重试时的回退初始间隔。
违约:
1000
. - backOffMax间隔
-
最大回退间隔。
违约:
10000
. - backOff乘数
-
退避乘数。
违约:
2.0
. - default可重试
-
监听器抛出的异常是否未在
retryableExceptions
是可重试的。违约:
true
. - 实例计数
-
当设置为大于等于零的值时,它允许自定义此消费者的实例计数(如果不同时
spring.cloud.stream.instanceCount
). 当设置为负值时,它默认为spring.cloud.stream.instanceCount
. 看Instance Index and Instance Count
了解更多信息。违约:
-1
. - 实例索引
-
当设置为大于等于零的值时,它允许自定义此消费者的实例索引(如果与
spring.cloud.stream.instanceIndex
). 当设置为负值时,它默认为spring.cloud.stream.instanceIndex
. 如果出现以下情况,则忽略instanceIndexList
被提供。 看Instance Index and Instance Count
了解更多信息。违约:
-1
. - 实例索引列表
-
与不支持原生分区的绑定器(如RabbitMQ)一起使用;允许应用程序实例从多个分区使用。
默认值:空。
- retryableExceptions
-
键中的 Throwable 类名和值中的布尔值的映射。 指定那些将重试或不会重试的异常(和子类)。 另请参阅
defaultRetriable
. 例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
.默认值:空。
- 使用原生解码
-
当设置为
true
,则入站消息由客户端库直接反序列化,必须相应地配置(例如,设置适当的 Kafka 生产者值反序列化器)。 使用此配置时,入站消息解组不是基于contentType
的绑定。 使用本机解码时,生产者负责使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。 此外,当使用本机编码和解码时,headerMode=embeddedHeaders
属性被忽略,并且标头不会嵌入到消息中。 查看生产者属性useNativeEncoding
.违约:
false
. - 多重
-
设置为 true 时,基础绑定器将本机多路复用同一输入绑定上的目标。
违约:
false
.
高级消费者配置
要对消息驱动的使用者的底层消息侦听器容器进行高级配置,请添加单个ListenerContainerCustomizer
bean 到应用程序上下文。
它将在应用上述属性后调用,并可用于设置其他属性。
同样,对于轮询的消费者,将MessageSourceCustomizer
豆。
以下是 RabbitMQ 绑定器的示例:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}
@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}
生产者属性
这些属性通过org.springframework.cloud.stream.binder.ProducerProperties
以下绑定属性仅适用于输出绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.producer.
(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
).
可以使用前缀spring.cloud.stream.default.producer
(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id
).
- 自动启动
-
是否需要自动启动此使用者的信号
违约:
true
. - partitionKey表达式
-
确定如何对出站数据进行分区的 SpEL 表达式。 如果设置,则此绑定上的出站数据将被分区。
partitionCount
必须设置为大于 1 的值才能生效。 看Partitioning Support
.默认值:null。
- partitionKeyExtractorName (分区密钥提取器名称)
-
实现的 Bean 的名称
PartitionKeyExtractorStrategy
.用于提取用于计算的密钥 分区 ID(参见 'partitionSelector*')。与 'partitionKeyExpression' 互斥。默认值:null。
- partitionSelectorName (分区选择器名称)
-
实现的 Bean 的名称
PartitionSelectorStrategy
.用于确定基于分区 ID 的 在分区键上(参见 'partitionKeyExtractor*')。与 'partitionSelectorExpression' 互斥。默认值:null。
- partitionSelector表达式
-
用于自定义分区选择的 SpEL 表达式。 如果两者都未设置,则将分区选择为
hashCode(key) % partitionCount
哪里key
通过partitionKeyExpression
.违约:
null
. - 分区计数
-
数据的目标分区数(如果启用了分区)。 如果生产者已分区,则必须设置为大于 1 的值。 在卡夫卡上,它被解释为一种提示。改用此值和目标主题的分区计数中的较大者。
违约:
1
. - 必需组
-
以逗号分隔的组列表,生产者必须确保消息传递到这些组,即使它们在创建后启动(例如,通过在 RabbitMQ 中预先创建持久队列)。
- headerMode
-
当设置为
none
,它会禁用输出上的标头嵌入。 它仅对本机不支持消息标头且需要标头嵌入的消息传递中间件有效。 当不支持本机标头时,当为非 Spring Cloud Stream 应用程序生成数据时,此选项非常有用。 当设置为headers
,它使用中间件的原生标头机制。 当设置为embeddedHeaders
,它将标头嵌入到消息有效负载中。默认值:取决于活页夹实现。
- 使用原生编码
-
当设置为
true
,则出站消息由客户端库直接序列化,必须相应地配置(例如,设置适当的 Kafka 生产者值序列化器)。 使用此配置时,出站消息封送不是基于contentType
的绑定。 使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka 使用者值反序列化程序)来反序列化入站消息。 此外,当使用本机编码和解码时,headerMode=embeddedHeaders
属性被忽略,并且标头不会嵌入到消息中。 查看消费者属性useNativeDecoding
.违约:
false
. - 错误通道启用
-
当设置为 true 时,如果 Binder 支持异步发送结果,则发送失败将发送到目标的错误通道。有关详细信息,请参阅错误处理。
默认值:false。
高级生产者配置
在某些情况下,生产者属性不足以在绑定器中正确配置生成的 MessageHandler,或者您可能更喜欢编程方法
同时配置此类生成的 MessageHandler。不管是什么原因,spring-cloud-stream 都提供了ProducerMessageHandlerCustomizer
来完成它。
@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Configure a {@link MessageHandler} that is being created by the binder for the
* provided destination name.
* @param handler the {@link MessageHandler} from the binder.
* @param destinationName the bound destination name.
*/
void configure(H handler, String destinationName);
}
如您所见,它使您可以访问生产的实际实例MessageHandler
您可以根据需要进行配置。您需要做的就是提供此策略的实现并将其配置为@Bean
.
内容类型协商
数据转换是任何消息驱动微服务架构的核心特性之一。鉴于在 Spring Cloud Stream 中,此类数据表示为 SpringMessage
,则消息在到达目的地之前可能必须转换为所需的形状或大小。这是必需的,原因有两个:
-
转换传入消息的内容以匹配应用程序提供的处理程序的签名。
-
将传出消息的内容转换为有线格式。
线材格式通常为byte[]
(Kafka 和 Rabbit 绑定器也是如此),但它受绑定器实现的约束。
在 Spring Cloud Stream 中,消息转换是通过org.springframework.messaging.converter.MessageConverter
.
作为对后续详细信息的补充,您可能还想阅读以下博客文章。 |
Mechanics
为了更好地理解内容类型协商背后的机制和必要性,我们以以下消息处理程序为例,来看看一个非常简单的用例:
public Function<Person, String> personFunction {..}
为简单起见,我们假设这是应用程序中唯一的处理程序函数(我们假设没有内部管道)。 |
前面示例中显示的处理程序需要一个Person
object 作为参数并生成一个String
类型作为输出。
为了让框架成功通过传入的Message
作为此处理程序的参数,它必须以某种方式转换Message
类型从线格式转换为Person
类型。
换句话说,框架必须找到并应用适当的MessageConverter
.
为了实现这一点,框架需要用户的一些指令。
其中一条指令已由处理程序方法本身的签名(Person
类型)。
因此,从理论上讲,这应该(并且在某些情况下)足够了。
但是,对于大多数用例,为了选择适当的MessageConverter
,框架需要额外的信息。
缺失的那块是contentType
.
Spring Cloud Stream 提供了三种机制来定义contentType
(按优先顺序排列):
-
HEADER:该
contentType
可以通过消息本身进行交流。通过提供contentType
header 时,您可以声明要用于查找和应用相应内容类型MessageConverter
. -
BINDING:该
contentType
可以通过设置spring.cloud.stream.bindings.input.content-type
财产。这 input
segment 对应于目标的实际名称(在本例中为“输入”)。此方法允许你在每个绑定的基础上声明用于查找和应用相应内容类型的内容类型MessageConverter
. -
默认值:如果
contentType
不存在于Message
header 或绑定,默认application/json
内容类型用于 找到并应用适当的MessageConverter
.
如前所述,前面的列表还演示了平局时的优先顺序。例如,标头提供的内容类型优先于任何其他内容类型。 这同样适用于按绑定设置的内容类型,这实质上允许您覆盖默认内容类型。 但是,它还提供了一个合理的默认值(这是根据社区反馈确定的)。
制作的另一个原因application/json
默认源于分布式微服务架构驱动的互作性需求,其中生产者和消费者不仅在不同的 JVM 中运行,还可以在不同的非 JVM 平台上运行。
当非 void 处理程序方法返回时,如果返回值已经是Message
那Message
成为有效载荷。但是,当返回值不是Message
,新的Message
在继承时使用返回值作为有效负载构造
来自输入的标头Message
减去定义或过滤的标头SpringIntegrationProperties.messageHandlerNotPropagatedHeaders
.
默认情况下,那里只设置了一个标头:contentType
.这意味着新的Message
没有contentType
header 设置,从而确保contentType
可以进化。
您可以随时选择不返回Message
从处理程序方法中,您可以在其中注入任何您想要的标头。
如果存在内部管道,则Message
通过相同的转换过程发送到下一个处理程序。但是,如果没有内部管道或您已到达管道的末尾,则Message
被发送回输出目标。
内容类型与参数类型
如前所述,对于框架选择合适的MessageConverter
,它需要参数类型,以及(可选)内容类型信息。
选择适当MessageConverter
驻留在参数解析器 (HandlerMethodArgumentResolvers
),这在调用用户定义的处理程序方法之前触发(即框架已知实际参数类型时)。
如果参数类型与当前有效负载的类型不匹配,则框架会委托给
预配置MessageConverters
看看其中任何一个是否可以转换有效负载。
如您所见,该Object fromMessage(Message<?> message, Class<?> targetClass);
MessageConverter 的作需要targetClass
作为其论点之一。
该框架还确保提供的Message
始终包含一个contentType
页眉。
当不存在 contentType 标头时,它会注入 per-bindingcontentType
header 或默认的contentType
页眉。
的组合contentType
参数类型是框架确定消息是否可以转换为目标类型的机制。
如果不合适MessageConverter
,则会抛出异常,您可以通过添加自定义MessageConverter
(参见User-defined Message Converters
).
但是,如果有效负载类型与处理程序方法声明的目标类型匹配怎么办?在这种情况下,没有什么要转换的,并且
有效负载未修改地传递。虽然这听起来非常简单和合乎逻辑,但请记住,采用Message<?>
或Object
作为论据。
通过将目标类型声明为Object
(这是一个instanceof
Java 中的所有内容),您基本上放弃了转换过程。
不期望Message 仅基于contentType .
请记住,contentType 与目标类型互补。
如果您愿意,可以提供提示,即MessageConverter 可能会考虑也可能不会考虑。 |
消息转换器
MessageConverters
定义两个方法:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
了解这些方法的契约及其用法非常重要,特别是在 Spring Cloud Stream 的上下文中。
这fromMessage
方法将传入的Message
设置为参数类型。
的有效负载Message
可以是任何类型,它是
直到实际实施MessageConverter
以支持多种类型。
例如,某些 JSON 转换器可能支持有效负载类型为byte[]
,String
,等。
当应用程序包含内部管道(即输入→ handler1 → handler2 →时,这一点很重要。→输出),上游处理程序的输出会产生Message
可能不是初始线格式。
但是,toMessage
方法具有更严格的契约,并且必须始终转换Message
到线格式:byte[]
.
因此,出于所有意图和目的(尤其是在实现您自己的转换器时),您将这两种方法视为具有以下签名:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);
提供的 MessageConverters
如前所述,该框架已经提供了MessageConverters
以处理最常见的用例。
以下列表描述了提供的MessageConverters
,按优先级顺序(第一个MessageConverter
使用有效的方法):
-
JsonMessageConverter
:顾名思义,它支持转换Message
to/from POJO 适用于以下情况contentType
是application/json
(默认)。 -
ByteArrayMessageConverter
:支持转换Message
从byte[]
自byte[]
适用于以下情况contentType
是application/octet-stream
.它本质上是一个直通,主要是为了向后兼容性而存在的。 -
ObjectStringMessageConverter
:支持将任何类型转换为String
什么时候contentType
是text/plain
. 它调用 Object 的toString()
方法,或者,如果有效负载是byte[]
,新的String(byte[])
.
如果找不到合适的转换器,框架会引发异常。发生这种情况时,您应该检查您的代码和配置并确保您没有遗漏任何内容(也就是说,确保您提供了contentType
通过使用绑定或标头)。
但是,您很可能发现了一些不常见的情况(例如自定义contentType
也许)和当前提供的堆栈MessageConverters
不知道如何转换。如果是这种情况,您可以添加自定义MessageConverter
.请参阅用户定义的消息转换器。
用户定义的消息转换器
Spring Cloud Stream 公开了一种机制来定义和注册额外的MessageConverter
s.
要使用它,请实现org.springframework.messaging.converter.MessageConverter
,将其配置为@Bean
.
然后将其附加到MessageConverter
s.
重要的是要了解习惯MessageConverter 实现被添加到现有堆栈的头部。
因此,定制MessageConverter 实现优先于现有实现,这使您可以覆盖和添加到现有转换器中。 |
以下示例显示如何创建消息转换器 Bean 以支持名为application/bar
:
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
应用间通信
Spring Cloud Stream 支持应用程序之间的通信。应用程序间通信是一个复杂的问题,涉及多个问题,如以下主题所述:
连接多个应用程序实例
虽然 Spring Cloud Stream 使单个 Spring Boot 应用程序可以轻松连接到消息传递系统,但 Spring Cloud Stream 的典型场景是创建多应用程序管道,其中微服务应用程序相互发送数据。 您可以通过关联“相邻”应用程序的输入和输出目标来实现此方案。
假设设计调用时间源应用程序将数据发送到 Log Sink 应用程序。您可以使用名为ticktock
用于两个应用程序中的绑定。
时间源(具有名为output
) 将设置以下属性:
spring.cloud.stream.bindings.output.destination=ticktock
Log Sink(绑定名为input
) 将设置以下属性:
spring.cloud.stream.bindings.input.destination=ticktock
实例索引和实例计数
在扩展 Spring Cloud Stream 应用程序时,每个实例都可以接收有关同一应用程序存在多少其他实例以及它自己的实例索引是什么的信息。
Spring Cloud Stream 通过spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
性能。
例如,如果 HDFS 接收器应用程序有三个实例,则所有三个实例都具有spring.cloud.stream.instanceCount
设置为3
,并且各个应用程序具有spring.cloud.stream.instanceIndex
设置为0
,1
和2
分别。
通过 Spring Cloud Data Flow 部署 Spring Cloud Stream 应用程序时,会自动配置这些属性;当 Spring Cloud Stream 应用程序独立启动时,必须正确设置这些属性。
默认情况下,spring.cloud.stream.instanceCount
是1
和spring.cloud.stream.instanceIndex
是0
.
在纵向扩展方案中,正确配置这两个属性对于解决分区行为(见下文)非常重要,并且某些绑定器(例如 Kafka 绑定器)始终需要这两个属性,以确保数据在多个使用者实例之间正确拆分。
分区
Spring Cloud Stream 中的分区包括两个任务:
配置用于分区的输出绑定
您可以通过设置其中一个且仅设置一个输出绑定来配置输出绑定以发送分区数据partitionKeyExpression
或partitionKeyExtractorName
属性,以及其partitionCount
财产。
例如,下面是有效的典型配置:
spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
根据该示例配置,使用以下逻辑将数据发送到目标分区。
分区键的值是根据partitionKeyExpression
.
这partitionKeyExpression
是根据出站消息计算的 SpEL 表达式(在前面的示例中,它是id
从消息头中提取分区键。
如果 SpEL 表达式不足以满足您的需求,您可以通过提供org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
并将其配置为 bean(通过使用@Bean
注释)。
如果您有多个类型的 beanorg.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
可用,您可以通过使用partitionKeyExtractorName
属性,如以下示例所示:
--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
return new CustomPartitionKeyExtractorClass();
}
在以前版本的 Spring Cloud Stream 中,您可以指定org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 通过将spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass 财产。
从 3.0 版开始,此属性被删除。 |
计算消息键后,分区选择过程将目标分区确定为0
和partitionCount - 1
.
默认计算适用于大多数方案,基于以下公式:key.hashCode() % partitionCount
.
这可以在绑定上自定义,通过设置要针对“键”计算的 SpEL 表达式(通过partitionSelectorExpression
属性)或通过配置org.springframework.cloud.stream.binder.PartitionSelectorStrategy
作为 bean(通过使用 @Bean 注释)。
类似于PartitionKeyExtractorStrategy
,您可以使用spring.cloud.stream.bindings.output.producer.partitionSelectorName
当应用程序上下文中有多个此类型的 bean 可用时,属性,如以下示例所示:
--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
return new CustomPartitionSelectorClass();
}
在以前版本的 Spring Cloud Stream 中,您可以指定org.springframework.cloud.stream.binder.PartitionSelectorStrategy 通过将spring.cloud.stream.bindings.output.producer.partitionSelectorClass 财产。
从 3.0 版开始,此属性被删除。 |
配置用于分区的输入绑定
输入绑定(绑定名称uppercase-in-0
) 配置为通过设置其partitioned
属性,以及instanceIndex
和instanceCount
属性,如以下示例所示:
spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true spring.cloud.stream.instanceIndex=3 spring.cloud.stream.instanceCount=5
这instanceCount
value 表示应在其中分区数据的应用程序实例总数。
这instanceIndex
必须是多个实例中的唯一值,值介于0
和instanceCount - 1
.
实例索引可帮助每个应用程序实例识别其从中接收数据的唯一分区。
使用不支持本机分区的技术的活页夹需要它。
例如,对于 RabbitMQ,每个分区都有一个队列,队列名称包含实例索引。
使用 Kafka,如果autoRebalanceEnabled
是true
(默认),Kafka 负责跨实例分发分区,并且不需要这些属性。
如果autoRebalanceEnabled
设置为 false,则instanceCount
和instanceIndex
被绑定器用来确定实例订阅的分区(必须至少与实例数量一样多的分区)。
绑定器分配分区而不是 Kafka。
如果您希望特定分区的消息始终转到同一实例,这可能很有用。
当 Binder 配置需要它们时,必须正确设置这两个值,以确保使用所有数据并确保应用程序实例接收互斥的数据集。
虽然在独立情况下使用多个实例进行分区数据处理的场景设置起来可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值并让您依赖运行时基础设施来提供有关实例索引和实例计数的信息来显着简化该过程。
测试
Spring Cloud Stream 支持在不连接到消息传递系统的情况下测试您的微服务应用程序。
Spring Integration 测试活页夹
Spring Cloud Stream 附带了一个测试绑定器,您可以使用它来测试各种应用程序组件,而无需实际的实际绑定器实现或消息代理。
该测试绑定器充当单元测试和集成测试之间的桥梁,并基于 Spring Integration 框架作为 JVM 内消息代理,本质上为您提供两全其美的体验 - 一个没有网络的真正绑定器。
测试活页夹配置
要启用 Spring Integration 测试绑定器,您只需将其添加为依赖项。
添加所需的依赖项
以下是所需 Maven POM 条目的示例。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
或者对于 build.gradle.kts
testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")
测试活页夹用法
现在,您可以将微服务作为简单的单元测试进行测试
@SpringBootTest
public class SampleStreamTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
public void testEmptyConfiguration() {
this.input.send(new GenericMessage<byte[]>("hello".getBytes()));
assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
}
@SpringBootApplication
@Import(TestChannelBinderConfiguration.class)
public static class SampleConfiguration {
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
}
}
如果您需要更多控制或想要在同一测试套件中测试多个配置 您还可以执行以下作:
@EnableAutoConfiguration
public static class MyTestConfiguration {
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
}
. . .
@Test
public void sampleTest() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
MyTestConfiguration.class))
.run("--spring.cloud.function.definition=uppercase")) {
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
source.send(new GenericMessage<byte[]>("hello".getBytes()));
assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes());
}
}
对于您有多个绑定和/或多个输入和输出的情况,或者只是想明确
您要发送或接收的目的地,send()
和receive()
方法InputDestination
和OutputDestination
被覆盖,以允许您提供输入和输出目标的名称。
请考虑以下示例:
@EnableAutoConfiguration
public static class SampleFunctionConfiguration {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
和实际测试
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
对于具有其他映射属性(例如destination
您应该使用这些名称。例如,考虑不同版本的
前面的测试,其中我们显式映射了uppercase
function 设置为myInput
和myOutput
绑定名称:
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleFunctionConfiguration.class))
.run(
"--spring.cloud.function.definition=uppercase;reverse",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=myInput",
"--spring.cloud.stream.bindings.uppercase-out-0.destination=myOutput"
)) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "myInput");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "myOutput");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
测试 Binder 和 PollableMessageSource
Spring Integration Test Binder 还允许您在使用PollableMessageSource
(有关更多详细信息,请参阅使用轮询的消费者)。
不过,需要理解的重要一点是,民意调查不是事件驱动的,而且PollableMessageSource
是一种策略,它公开了生成(轮询)消息(单数)的作。
轮询的频率或使用的线程数或轮询的位置(消息队列或文件系统)完全取决于您;
换句话说,您有责任配置轮询器或线程或消息的实际源。幸运的是,Spring 有很多抽象来配置它。
让我们看一下例子:
@Test
public void samplePollingTest() {
ApplicationContext context = new SpringApplicationBuilder(SamplePolledConfiguration.class)
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false", "--spring.cloud.stream.pollable-source=myDestination");
OutputDestination destination = context.getBean(OutputDestination.class);
System.out.println("Message 1: " + new String(destination.receive().getPayload()));
System.out.println("Message 2: " + new String(destination.receive().getPayload()));
System.out.println("Message 3: " + new String(destination.receive().getPayload()));
}
@Import(TestChannelBinderConfiguration.class)
@EnableAutoConfiguration
public static class SamplePolledConfiguration {
@Bean
public ApplicationRunner poller(PollableMessageSource polledMessageSource, StreamBridge output, TaskExecutor taskScheduler) {
return args -> {
taskScheduler.execute(() -> {
for (int i = 0; i < 3; i++) {
try {
if (!polledMessageSource.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
output.send("myOutput", newPayload);
})) {
Thread.sleep(2000);
}
}
catch (Exception e) {
// handle failure
}
}
});
};
}
}
上面(非常基本的)示例将在 2 秒的间隔内生成 3 条消息,将它们发送到Source
此活页夹发送到哪个OutputDestination
我们检索它们的地方(对于任何断言)。
目前,它打印以下内容:
Message 1: POLLED DATA
Message 2: POLLED DATA
Message 3: POLLED DATA
如您所见,数据是相同的。这是因为此绑定器定义了实际MessageSource
- 来源
使用poll()
操作。虽然对于大多数测试方案来说已经足够了,但在某些情况下,你可能需要
定义您自己的MessageSource
.为此,只需配置一个类型为MessageSource
在测试配置中提供您自己的
消息溯源的实现。
下面是示例:
@Bean
public MessageSource<?> source() {
return () -> new GenericMessage<>("My Own Data " + UUID.randomUUID());
}
渲染以下输出;
Message 1: MY OWN DATA 1C180A91-E79F-494F-ABF4-BA3F993710DA
Message 2: MY OWN DATA D8F3A477-5547-41B4-9434-E69DA7616FEE
Message 3: MY OWN DATA 20BF2E64-7FF4-4CB6-A823-4053D30B5C74
不要命名这个豆子messageSource 因为它将与同名(不同类型)的 bean 发生冲突
由 Spring Boot 出于不相关的原因提供。 |
关于混合测试活页夹和常规中间件活页夹进行测试的特别说明
提供基于 Spring Integration 的测试绑定器来测试应用程序,而不涉及实际的基于中间件的绑定器,例如 Kafka 或 RabbitMQ 绑定器。 如上一节所述,测试绑定器可帮助您依靠内存中的 Spring Integration 通道快速验证应用程序行为。 当测试绑定器存在于测试类路径上时,Spring Cloud Stream将尝试将此绑定器用于所有测试目的,只要它需要绑定器进行通信。 换句话说,您不能在同一模块中混合测试绑定器和常规中间件绑定器以进行测试。 在使用测试绑定器测试应用程序后,如果要继续使用实际的中间件绑定器进行进一步的集成测试,建议将那些使用实际绑定器的测试添加到单独的模块中,以便这些测试可以与实际的中间件建立正确的连接,而不是依赖于测试绑定器提供的内存中通道。
健康指标
Spring Cloud Stream 为绑定器提供了一个健康指示器。
它以binders
并且可以通过设置management.health.binders.enabled
财产。
要启用健康检查,您首先需要通过包含其依赖项来启用“web”和“actuator”(请参阅绑定可视化和控制)
如果management.health.binders.enabled
未由应用程序显式设置,则management.health.defaults.enabled
匹配为true
并且活页夹运行状况指示器已启用。
如果您想完全禁用健康指示器,则必须将management.health.binders.enabled
自false
.
您可以使用 Spring Boot 执行器运行状况端点来访问运行状况指示器 -/actuator/health
.
默认情况下,只有在到达上述端点时,才会收到顶级应用程序状态。
为了从活页夹特定运行状况指标中接收完整详细信息,您需要包含属性management.endpoint.health.show-details
与值ALWAYS
在您的应用程序中。
运行状况指标特定于活页夹,某些活页夹实现可能不一定提供运行状况指示器。
如果要完全禁用所有现成的运行状况指示器,而是提供自己的运行状况指示器,
您可以通过设置属性来做到这一点management.health.binders.enabled
自false
然后提供您自己的HealthIndicator
bean 的 bean。
在这种情况下,Spring Boot 中的健康指标基础设施仍将选择这些自定义 Bean。
即使您没有禁用活页夹运行状况指示器,您仍然可以通过提供自己的运行状况检查来增强运行状况检查HealthIndicator
bean 除了开箱即用的健康检查之外。
当您在同一应用程序中有多个绑定器时,默认情况下会启用运行状况指示器,除非应用程序通过设置management.health.binders.enabled
自false
.
在这种情况下,如果用户想要禁用绑定器子集的运行状况检查,则应通过将management.health.binders.enabled
自false
在多活页夹配置的环境中。
有关如何提供特定于环境的属性的详细信息,请参阅连接到多个系统。
如果类路径中存在多个绑定器,但并非所有绑定器都在应用程序中使用,这可能会在运行状况指示器的上下文中导致一些问题。
可能有关于如何执行运行状况检查的特定实施细节。例如,Kafka 绑定器可以将状态确定为DOWN
如果活页夹没有注册的目的地。
让我们举一个具体的情况。假设您在类路径中同时存在 Kafka 和 Kafka Streams 绑定器,但仅在应用程序代码中使用 Kafka Streams 绑定器,即仅使用 Kafka Streams 绑定器提供绑定。由于未使用 Kafka 绑定器,并且它有特定的检查来查看是否注册了任何目标,因此绑定器运行状况检查将失败。顶级应用程序运行状况检查状态将报告为DOWN
. 在这种情况下,您可以简单地从应用程序中删除 kafka binder 的依赖项,因为您没有使用它。
Samples
有关 Spring Cloud Stream 示例,请参阅 GitHub 上的 spring-cloud-stream-samples 存储库。
在CloudFoundry上部署流应用程序
在 CloudFoundry 上,服务通常通过名为 VCAP_SERVICES 的特殊环境变量公开。
配置绑定器连接时,您可以使用环境变量中的值,如数据流 Cloud Foundry Server 文档中所述。
活页夹实现
以下是可用活页夹实现的列表
前面已经提到,Binder 抽象也是框架的扩展点之一。因此,如果您在前面的列表中找不到合适的 binder,您可以在 Spring Cloud Stream 之上实现自己的 binder。
在如何从头开始创建 Spring Cloud Stream Binder 发布社区成员文档中
详细介绍,通过示例,实现自定义活页夹所需的一组步骤。
这些步骤也在Implementing Custom Binders
部分。