对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

Spring 数据集成之旅简史

Spring 的数据集成之旅始于 Spring Integration。凭借其编程模型,它提供了一致的开发人员体验来构建可以采用企业集成模式以与外部系统(例如数据库、消息代理等)连接的应用程序。spring-doc.cadn.net.cn

快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,开发基于 Spring 的独立生产级微服务变得无缝。spring-doc.cadn.net.cn

为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被放到一个新项目中。Spring Cloud Stream 诞生了。spring-doc.cadn.net.cn

借助 Spring Cloud Stream,开发人员可以:spring-doc.cadn.net.cn

  • 单独构建、测试和部署以数据为中心的应用程序。spring-doc.cadn.net.cn

  • 应用现代微服务体系结构模式,包括通过消息传递进行组合。spring-doc.cadn.net.cn

  • 将应用程序职责与以事件为中心的思维解耦。事件可以表示及时发生的事情,下游消费者应用程序可以在不知道其起源或生产者身份的情况下对此做出反应。spring-doc.cadn.net.cn

  • 将业务逻辑移植到消息代理(例如 RabbitMQ、Apache Kafka、Amazon Kinesis)上。spring-doc.cadn.net.cn

  • 依靠框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。spring-doc.cadn.net.cn

  • 还有很多......spring-doc.cadn.net.cn

快速入门

按照此三步指南,您可以在不到 5 分钟的时间内试用 Spring Cloud Stream,甚至在进入任何细节之前。spring-doc.cadn.net.cn

我们将向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的消息传递中间件的消息(稍后会详细介绍),并将收到的消息记录到控制台。 我们称之为LoggingConsumer. 虽然不是很实用,但它很好地介绍了一些主要概念 和抽象,使消化本用户指南的其余部分变得更加容易。spring-doc.cadn.net.cn

这三个步骤如下:spring-doc.cadn.net.cn

使用 Spring Initializr 创建示例应用程序

要开始使用,请访问 Spring Initializr。从那里,您可以生成我们的LoggingConsumer应用。为此,请执行以下作:spring-doc.cadn.net.cn

  1. “依赖项”部分中,开始键入stream. 当出现“云流”选项时,选择它。spring-doc.cadn.net.cn

  2. 开始输入“kafka”或“rabbit”。spring-doc.cadn.net.cn

  3. 选择“Kafka”或“RabbitMQ”。spring-doc.cadn.net.cn

    基本上,您可以选择应用程序绑定到的消息传递中间件。 我们建议使用您已经安装的或对安装和运行感觉更舒服的那个。 此外,正如您从初始化器屏幕中看到的那样,您还可以选择其他一些选项。 例如,您可以选择 Gradle 作为构建工具,而不是 Maven(默认)。spring-doc.cadn.net.cn

  4. “项目”字段中,键入“logging-consumer”。spring-doc.cadn.net.cn

    “项目”字段的值将成为应用程序名称。 如果您选择了 RabbitMQ 作为中间件,则您的 Spring Initializr 现在应该如下所示:spring-doc.cadn.net.cn

弹簧初始化
  1. 单击“生成项目”按钮。spring-doc.cadn.net.cn

    这样做会将生成项目的压缩版本下载到您的硬盘驱动器。spring-doc.cadn.net.cn

  2. 将文件解压缩到要用作项目目录的文件夹中。spring-doc.cadn.net.cn

我们鼓励您探索 Spring Initializr 中可用的多种可能性。 它允许您创建许多不同类型的 Spring 应用程序。

将项目导入 IDE

现在,您可以将项目导入 IDE。 请记住,根据 IDE,您可能需要遵循特定的导入过程。 例如,根据项目的生成方式(Maven 或 Gradle),您可能需要遵循特定的导入过程(例如,在 Eclipse 或 STS 中,您需要使用文件→导入→ Maven →现有 Maven 项目)。spring-doc.cadn.net.cn

导入后,项目不得有任何类型的错误。也src/main/java应包含com.example.loggingconsumer.LoggingConsumerApplication.spring-doc.cadn.net.cn

从技术上讲,此时,您可以运行应用程序的主类。 它已经是一个有效的 Spring Boot 应用程序。 但是,它不执行任何作,因此我们想添加一些代码。spring-doc.cadn.net.cn

添加消息处理程序、构建和运行

修改com.example.loggingconsumer.LoggingConsumerApplication类如下所示:spring-doc.cadn.net.cn

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

从前面的列表中可以看出:spring-doc.cadn.net.cn

这样做还可以让您看到该框架的核心功能之一:它尝试自动将传入消息有效负载转换为类型Person.spring-doc.cadn.net.cn

您现在拥有了一个功能齐全的 Spring Cloud Stream 应用程序,它可以监听消息。 从这里开始,为了简单起见,我们假设您在第一步中选择了 RabbitMQ。 假设您已安装并运行 RabbitMQ,您可以通过运行其main方法。spring-doc.cadn.net.cn

您应该会看到以下输出:spring-doc.cadn.net.cn

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

进入 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,向input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg. 这anonymous.CbMIwdkJSBO1ZoPDOtHtCgpart 表示组名称并生成,因此它在您的环境中必然会有所不同。 对于更可预测的内容,您可以通过将spring.cloud.stream.bindings.input.group=hello(或任何你喜欢的名字)。spring-doc.cadn.net.cn

消息的内容应该是Person类,如下所示:spring-doc.cadn.net.cn

{"name":"Sam Spade"}

然后,在控制台中,您应该会看到:spring-doc.cadn.net.cn

Received: Sam Spadespring-doc.cadn.net.cn

您还可以将应用程序构建并打包到引导 jar 中(通过使用./mvnw clean install),然后使用java -jar命令。spring-doc.cadn.net.cn

现在您有一个工作(尽管非常基本)的 Spring Cloud Stream 应用程序。spring-doc.cadn.net.cn

流数据上下文中的 Spring 表达式语言 (SpEL)

在本参考手册中,您将遇到许多可以使用 Spring 表达式语言 (SpEL) 的功能和示例。在使用它时了解某些限制非常重要。spring-doc.cadn.net.cn

SpEL 允许您访问当前消息以及您正在运行的应用程序上下文。 但是,了解 SpEL 可以看到什么类型的数据非常重要,尤其是在传入消息的上下文中。 消息以 byte[] 的形式从代理到达。然后将其转换为Message<byte[]>通过活页夹,而你可以看到消息的有效负载保持其原始形式。消息的标头是<String, Object>,其中值通常是另一个基元或基元的集合/数组,因此是 Object。 这是因为 binder 不知道所需的输入类型,因为它无法访问用户代码(函数)。因此,活页夹有效地以邮件标题的形式递送了一个包含有效负载和一些可读元数据的信封,就像通过邮件递送的信件一样。 这意味着,虽然可以访问消息的有效负载,但您只能将其作为原始数据(即 byte[])访问。虽然开发人员要求能够以具体类型(例如 Foo、Bar 等)对有效负载对象的字段进行 SpEL 访问可能是很常见的,但您可以看到实现这是多么困难甚至不可能。 这是一个例子来演示这个问题;假设您有一个路由表达式,用于根据有效负载类型路由到不同的函数。此要求意味着有效负载从 byte[] 转换为特定类型,然后应用 SpEL。但是,为了执行这样的转换,我们需要知道要传递给转换器的实际类型,这来自函数的签名,我们不知道是哪个。解决此要求的更好方法是将类型信息作为消息头传递(例如,application/json;type=foo.bar.Baz).您将获得一个清晰可读的字符串值,可以在一年内访问和评估该值,并且易于阅读的 SpEL 表达式。spring-doc.cadn.net.cn

此外,使用有效负载进行路由决策被认为是非常糟糕的做法,因为有效负载被认为是特权数据 - 数据只能由其最终接收者读取。同样,使用邮件递送的类比,您不希望邮递员打开您的信封并阅读信件的内容来做出一些递送决定。同样的概念也适用于此,尤其是在生成消息时相对容易包含此类信息时。它强制执行与要通过网络传输的数据的设计以及此类数据中的哪些部分可以被视为公共数据以及哪些数据具有特权相关的一定级别的纪律。spring-doc.cadn.net.cn