对于最新稳定版本,请使用spring-cloud-stream 5.0.1spring-doc.cadn.net.cn

一个关于Spring数据集成之旅的简短历史记录

Spring 的数据集成之旅始于 Spring Integration。凭借其编程模型,它为构建可以拥抱 企业集成模式 以连接外部系统(例如,数据库、消息代理和其他)的应用程序提供了统一的开发人员体验。spring-doc.cadn.net.cn

快速转向云时代,微服务在企业环境中变得突出。Spring Boot改变了开发人员构建应用程序的方式。有了Spring编程模型和Spring Boot处理的运行时责任,在此基础上开发独立的、适合生产的Spring微服务变得轻而易举。spring-doc.cadn.net.cn

为了扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被整合到了一个新项目中。Spring Cloud Stream诞生了。spring-doc.cadn.net.cn

使用 Spring Cloud Stream,开发人员可以执行以下操作:spring-doc.cadn.net.cn

  • 在隔离中构建、测试和部署以数据为中心的应用程序。spring-doc.cadn.net.cn

  • 应用现代微服务架构模式,包括通过消息传递进行组合。spring-doc.cadn.net.cn

  • 使用以事件为中心的思维解耦应用程序职责。事件可以表示某个时间点发生的事情,下游消费者应用程序可以对它进行响应,而无需知道它的来源或生产者的身份。spring-doc.cadn.net.cn

  • 将业务逻辑移植到消息代理(如RabbitMQ、Apache Kafka、Amazon Kinesis)上。spring-doc.cadn.net.cn

  • 依赖框架对常见场景的自动内容类型支持。可以扩展到不同的数据转换类型。spring-doc.cadn.net.cn

  • 以及更多。 . .spring-doc.cadn.net.cn

快速开始

你可以通过这三步指南,在不深入任何细节的情况下,用不到5分钟就尝试Spring Cloud Stream。spring-doc.cadn.net.cn

我们向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的消息中间件(稍后会详细介绍)的消息,并将接收到的消息记录到控制台。 我们将其称为 LoggingConsumer。 虽然不是很实用,但它为一些主要概念和抽象提供了一个良好的入门介绍,使您更容易理解本用户指南的其余部分。spring-doc.cadn.net.cn

步骤如下:spring-doc.cadn.net.cn

使用 Spring Initializr 创建示例应用程序

要开始使用,请访问 Spring Initializr。从那里,您可以生成我们的 LoggingConsumer 应用程序。为此:spring-doc.cadn.net.cn

  1. 依赖项 部分,开始输入 stream。 当出现“Cloud Stream”选项时,请选择它。spring-doc.cadn.net.cn

  2. 输入框中开始输入单词,会根据单词自动显示相关的文档内容。spring-doc.cadn.net.cn

  3. 选择“Kafka”或“RabbitMQ”。spring-doc.cadn.net.cn

    基本来说,您选择您的应用程序绑定的消息中间件。 我们建议使用您已经安装的中间件,或者更熟悉安装和运行的中间件。 同样,正如初始化器屏幕所示,您还可以选择其他一些选项进行选择。 例如,您可以选择 Gradle 作为构建工具,而不是默认的 Maven。spring-doc.cadn.net.cn

  4. 在< strong >项目ID字段中,输入“logging-consumer”。spring-doc.cadn.net.cn

    Artifact 字段的值将成为应用程序名称。 如果你选择了 RabbitMQ 作为中间件,那么你的 Spring Initializr 应该现在如下所示:spring-doc.cadn.net.cn

spring initializr
  1. 点击 生成项目 按钮。spring-doc.cadn.net.cn

    这样做会将生成项目的压缩包下载到您的硬盘上。spring-doc.cadn.net.cn

  2. 解压缩文件到你想用作项目目录的文件夹。spring-doc.cadn.net.cn

我们鼓励您探索 Spring Initializr 提供的多种可能性。 它可以让您创建许多不同类型的 Spring 应用程序。

导入项目到您的IDE

现在您可以将项目导入到您的IDE中。请注意,这取决于IDE,可能需要遵循特定的导入过程。例如,无论项目是如何生成(Maven或Gradle),您可能需要遵循特定的导入过程(例如,在Eclipse或STS中,您需要使用“文件”→“导入”→“Maven”→“现有Maven项目”。)spring-doc.cadn.net.cn

导入后,项目不应有任何错误。此外,src/main/java 应包含 com.example.loggingconsumer.LoggingConsumerApplicationspring-doc.cadn.net.cn

从技术上讲,在这一点上,您可以运行主类的应用程序。</p> <p>它已经是有效的 Spring Boot 应用程序。</p> <p>但是,它什么也不做,所以我们要添加一些代码。spring-doc.cadn.net.cn

添加消息处理器、生成和运行

修改 com.example.loggingconsumer.LoggingConsumerApplication 类,使其看起来如下所示:spring-doc.cadn.net.cn

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

正如前面的代码示例中所示:spring-doc.cadn.net.cn

这样做还可以看到框架的核心功能之一:它试图自动将传入的消息有效负载转换为类型Personspring-doc.cadn.net.cn

您现在拥有了一个功能齐全的Spring Cloud Stream应用程序,它侦听消息。从这里开始,为了简单起见,我们假设您在第一步中选择了RabbitMQ。假设您已安装并运行了RabbitMQ,您可以通过在IDE中运行其main方法来启动该应用。spring-doc.cadn.net.cn

您应该看到以下输出:spring-doc.cadn.net.cn

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到RabbitMQ管理控制台或其他任何RabbitMQ客户端,并向input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg发送消息。代码anonymous.CbMIwdkJSBO1ZoPDOtHtCg部分表示组名称,因此在您的环境中可能不同。要使用更可预测的内容,请通过设置spring.cloud.stream.bindings.input.group=hello(或您喜欢的任何名称)使用显式组名称。spring-doc.cadn.net.cn

消息内容应该是Person类的JSON表示形式,如下所示:spring-doc.cadn.net.cn

{"name":"Sam Spade"}

然后,在您的控制台中,您应该看到:spring-doc.cadn.net.cn

Received: Sam Spadespring-doc.cadn.net.cn

您还可以通过使用./mvnw clean install构建和打包应用程序,然后使用java -jar命令运行构建的JAR。spring-doc.cadn.net.cn

现在,您拥有了一个工作(尽管非常基础)的Spring Cloud Stream应用程序。spring-doc.cadn.net.cn

Spring 表达式语言(SpEL)在流式数据的上下文中

在整个参考手册中,您会遇到许多使用Spring表达式语言(SpEL)的功能和示例。了解它在使用时的一些限制是很重要的。spring-doc.cadn.net.cn

SpEL 给你访问当前消息以及你正在运行的应用程序上下文的权限。您需要了解 SpEL 可以查看哪种类型的数据,尤其是在传入消息的上下文中。这很重要。从代理接收到的消息是以字节数组(byte[])的形式到达。It is then transformed to a 0 by the binders whereas you can see the payload of the message maintains its raw form. 消息的头部是 <String, Object>,其中值通常为另一个基本类型或基本类型的集合/数组,因此是 Object。它是因为绑定器不知道所需的输入类型,因为它无法访问用户代码(函数)。所以,消息传递有效地将包含有效负载和消息头中可读元数据的消息传递给绑定器,就像信件通过邮件传递一样。这意味著虽然可以访问消息的有效内容,但您只能将其作为原始数据访问(即。e., byte[].)而且对于开发人员来说,要求能够以具体类型访问有效负载对象的字段的SpEL访问可能是非常常见的(例如。g., Foo, Bar 等等),你可以想象这样做会有多困难,甚至是不可能的。这里有一个示例来演示问题;想象你有一个路由表达式,可以根据payload类型将请求路由到不同的函数。此要求将意味着从字节数组到特定类型的有效载荷转换,然后应用SpEL。(但是,为了执行这种转换,我们需要知道要传递给转换器的实际类型,这来自于我们不知道哪一个的函数签名。)一种更好的解决此需求的方法是将类型信息作为消息头传递 (e.g., application/json;type=foo.bar.Baz ).你将获得一个清晰易读的String值,该值可以被访问并在一年内轻松评估的SpEL表达式使用。spring-doc.cadn.net.cn

(另外,不建议使用有效负载进行路由决策,因为有效负载被视为特权数据——只有最终接收方才能读取的数据。再次使用邮件递送类比,您不希望邮差打开您的信封并阅读信函的内容来做出一些递送决策。同样的概念也适用,尤其是在生成消息时包含此类信息相对容易的情况下。这强制执行与要在网络上传输的数据的设计相关的某些纪律,以及哪些数据可以被认为是公开的,哪些是特权的)spring-doc.cadn.net.cn