|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
消息的生产和消费
您可以通过编写函数并将其作为 @Bean s 来编写一个 Spring Cloud Stream 应用程序。
您也可以使用基于注解的 Spring Integration 配置或
基于注解的 Spring Cloud Stream 配置,尽管从 spring-cloud-stream 3.x 开始
我们推荐使用函数式实现。
支持 Spring Cloud 函数
概述
从 Spring Cloud Stream v2.1 开始,定义 流处理器 和 源 的另一种选择是使用内置的 Spring Cloud Function 支持,其中它们可以表示为类型为 java.util.function.[Supplier/Function/Consumer] 的 bean。
要指定绑定到由绑定公开的外部目标的功能性 bean,您必须提供spring.cloud.function.definition属性。
In the event you only have single bean of type java.util.function.[Supplier/Function/Consumer], you can
skip the spring.cloud.function.definition property, since such functional bean will be auto-discovered. However,
it is considered best practice to use such property to avoid any confusion.
Some time this auto-discovery can get in the way, since single bean of type java.util.function.[Supplier/Function/Consumer]
could be there for purposes other then handling messages, yet being single it is auto-discovered and auto-bound.
For these rare scenarios you can disable auto-discovery by providing spring.cloud.stream.function.autodetect property with value set to false. |
这里是将消息处理器作为 java.util.function.Function 公开的应用程序示例,通过充当数据的消费者和生产者,有效地支持
传递 语义。
@SpringBootApplication
public class MyFunctionBootApp {
public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class);
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}
在前面的例子中,我们定义了一个类型为 java.util.function.Function 的 bean,称为 toUpperCase,用作消息处理器
其 'input' 和 'output' 必须绑定到由提供的目标绑定器公开的外部目的地。
默认情况下,'input' 和 'output' 绑定名称将是 toUpperCase-in-0 和 toUpperCase-out-0。
请参阅功能绑定名称部分以了解用于建立绑定名称的命名约定详情。
以下是支持其他语义的简单功能应用程序的示例:
这是作为java.util.function.Supplier暴露的source语义的一个示例
@SpringBootApplication
public static class SourceFromSupplier {
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
以下是作为java.util.function.Consumer暴露的sink semantics的示例
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
提供商(源)
Function 和 Consumer 在调用触发方式上非常直接。它们是基于发送到绑定的目标的数据(事件)来触发的。换句话说,它们是经典的事件驱动组件。
然而,Supplier在其触发方面属于一个独立的类别。因为根据定义,它是数据的源(起点),所以它不订阅任何入站目标,并且必须通过其他机制来触发。
另外还有一个关于Supplier实现的问题,它可以是指令式或响应式的,这直接与这类提供商的触发相关。
考虑以下示例:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
前面的Supplier Bean 在每次调用其get()方法时都会生成一个字符串。但是,谁来调用此方法以及调用频率如何?
框架提供了一个默认轮询机制(回答“谁?”的问题),该机制将触发提供商的调用,默认情况下每秒执行一次(回答“多频繁?”的问题)。
换句话说,上述配置会每秒钟生成一条消息,每条消息发送到由绑定器公开的output目标。轮询配置属性部分了解如何自定义轮询机制。
考虑一个不同的例子:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
前面的 Supplier bean 采用了响应式编程风格。通常情况下,与命令式提供商不同,它应该只触发一次,因为调用其 get() 方法会生成(提供)连续的消息流,而不是单个消息。
该框架能够识别编程风格上的差异,并保证此类提供商仅被触发一次。
然而,设想一种使用场景,你希望轮询某个数据源并返回一个有限的数据流来表示结果集。响应式编程风格是这种Supplier的理想机制。但是,由于产生的数据流具有有限性,这样的Supplier仍然需要定期调用。
考虑下面的示例,该示例通过生成有限的数据流来模拟这种使用情况:
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
该 Bean 本身使用 PollableBean 注解(@Bean 的子集),从而向框架发出信号,表明尽管此类提供商的实现是响应式的,但仍需要轮询。
在PollableBean中定义了splittable属性,该属性向此注解的后处理器发出信号,表明由带注解组件生成的结果必须进行拆分,默认情况下设置为true。这意味着框架会将返回的消息逐个拆分成单独消息发送出去。如果这不是期望的行为,则可以将其设置为false,此时提供商将简单地返回生成的Flux而不进行拆分。 |
提供商与线程
到目前为止,您已经了解到,与由事件触发(具有输入数据)的Function和Consumer不同,Supplier没有任何输入,因此通过不同的机制——轮询器来触发。该机制可能有不可预测的线程处理机制。虽然在大多数情况下,线程处理机制的具体细节对函数的下游执行并不重要,但在某些情况下可能会出现问题,特别是对于那些对线程关联性有一定期望的集成框架。例如,Spring Cloud Sleuth依赖于存储在线程本地的跟踪数据。针对这些情况,我们提供了另一种通过 |
消费者(响应式)
响应式 Consumer 稍微有点特殊,因为它有一个空的返回类型,使得框架无法引用订阅。
你很可能不需要编写 Consumer<Flux<?>>,而是将其写成一个在流上作为最后一个操作符调用 then
操作符的 Function<Flux<?>, Mono<Void>>。
例如:
public Function<Flux<?>, Mono<Void>> consumer() {
return flux -> flux.map(..).filter(..).then();
}
但是,如果您确实需要编写显式的Consumer<Flux<?>>,请记得订阅传入的Flux。
另外,请记住,当混合使用响应式和命令式函数时,同样的规则适用于函数组合。Spring Cloud Function确实支持将响应式函数与命令式函数进行组合,但您必须意识到某些限制。
例如,假设您已经将响应式函数与命令式消费者进行了组合。
这种组合的结果是一个响应式的Consumer。然而,正如本节前面所述,无法订阅这样的消费者,因此只能通过使您的消费者变为响应式并手动订阅(如前所述),或者将您的函数更改为命令式来解决此限制。
轮询配置属性
Spring Cloud Stream公开了以下属性,并使用spring.integration.poller.作为前缀:
- 固定延迟
-
默认轮询器的固定延迟,单位为毫秒。
默认值:1000L。
- 每轮询的最大消息数
-
默认轮询器每次轮询事件的最大消息数。
默认值:1L。
- 定时任务
-
定时器触发器的Cron表达式值。
默认值为 no.
- 初始延迟
-
周期性触发器的初始延迟。
默认值:0。
- 时间单位
-
延迟值应用的时间单位。
默认值:MILLISECONDS。
例如 --spring.integration.poller.fixed-delay=2000 将轮询间隔设置为每两秒轮询一次。
单绑定转导配置
上一节展示了如何配置一个应用于所有绑定的单个默认轮询器。虽然这与微服务模型非常吻合,spring-cloud-stream专为每个微服务代表一个单一组件(例如Supplier)而设计,因此默认轮询器配置已足够;但在某些边缘情况下,您可能有多个需要不同轮询配置的组件。
对于这些情况,请使用每个绑定的方式配置轮询器。例如,假设您有一个输出绑定supply-out-0。在这种情况下,可以使用spring.cloud.stream.bindings.supply-out-0.producer.poller..前缀为该绑定配置轮询器(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)。
将任意数据发送到输出(例如外部事件驱动源)
在某些情况下,数据的实际来源可能来自外部(非绑定器)系统。例如,数据源可能是传统的REST端点。我们如何将此类数据源与spring-cloud-stream使用的功能机制联系起来?
Spring Cloud Stream 提供了两种机制,让我们更详细地了解它们
在这里,对于两个示例,我们将使用一个标准的MVC端点方法delegateToSupplier绑定到根web上下文,通过StreamBridge机制将传入请求委托给流。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
在这里,我们自动装配了一个 StreamBridge Bean,它允许我们将数据发送到输出绑定中,从而有效地将非流应用程序与 Spring Cloud Stream 进行连接。请注意,在前面的例子中没有定义任何源函数(例如 Supplier Bean),这使得框架在事先无法创建源绑定(这是配置包含函数 Bean 的典型情况)。不过没关系,因为 StreamBridge 将会在第一次调用其 send(..) 操作时启动创建输出绑定(以及必要时的目的地自动提供)过程,并缓存这些绑定以便后续重用(更多细节请参阅StreamBridge 和动态目的地)。
然而,如果您希望在初始化(启动)时预先创建输出绑定,则可以利用spring.cloud.stream.output-bindings属性,在该属性中声明您的源名称。提供的名称将用作触发器来创建源绑定。;可用于表示多个源(多个输出绑定),例如:--spring.cloud.stream.output-bindings=foo;bar
另外,请注意streamBridge.send(..)方法需要一个Object作为数据。这意味着您可以发送POJO或Message,它在输出时会遵循与任何提供相同一致性级别的Function或Supplier相同的流程。
这表示,即使输出类型转换、分区等也如同来自函数的输出一样被遵守。
| 与显式绑定创建中所解释的不同,StreamBridge在设计时既考虑了性能,又具备根据需要动态创建大量绑定的能力。为了实现这一点,StreamBridge创建的实际绑定不会被缓存在应用程序上下文中,因此无法按照绑定可视化和控制中的说明进行管理。 但是如果你仍然希望使用StreamBridge来动态创建绑定并在之后进行管理,请使用以下机制,在使用StreamBridge之前明确创建绑定——ref:spring-cloud-stream/binding_visualization_control.adocl#_define_new_and_manage_existing_bindings[以编程方式定义新绑定] |
异步发送的 StreamBridge
StreamBridge 使用了 Spring Integration 框架提供的发送机制,该框架是 Spring Cloud Stream 的核心。默认情况下,此机制使用发送者的线程。换句话说,发送是阻塞的。虽然这对许多情况来说是可以接受的,但在某些情况下,您可能希望这种发送是异步的。为此,请在调用其中一个发送方法之前使用 setAsync(true) 方法的 StreamBridge。
异步发送时的可观测性上下文传播
在使用框架提供的可观测性支持以及支持Spring框架时,破坏线程边界会影响可观测性上下文的一致性,从而影响您的跟踪历史。为了避免这种情况,您所需要做的就是添加context-propagation依赖Micrometer(见下文)。
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
<version>1.1.0</version>
</dependency>
流桥接器和动态目标
StreamBridge 还可以用于输出目标未知的情况,类似于消费者路由部分所述的用例。
让我们看一下这个例子
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, args);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("myDestination", body);
}
}
如您所见,前面的例子与前一个非常相似,唯一的区别是通过spring.cloud.stream.output-bindings属性提供了显式的绑定指令(未提供)。这里我们向不存在绑定的myDestination名称发送数据。因此,根据消费者路由部分所述,该名称将被视为动态目标。
在前面的例子中,我们使用ApplicationRunner作为外部源来填充流。
一个更实际的例子,其中外部资源是REST端点。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
streamBridge.send("myBinding", body);
}
}
从 delegateToSupplier 方法内部可以看到,我们正在使用 StreamBridge 将数据发送到 myBinding 绑定。在此过程中,您还可以利用 StreamBridge 的动态特性,如果 myBinding 不存在,则会自动创建并缓存,否则将使用现有的绑定。
缓存动态目标(绑定)可能会导致内存泄漏,特别是在存在大量动态目标的情况下。为了提供一定程度的控制,我们为输出绑定提供了自清除缓存机制,默认缓存大小为10。这意味着如果您的动态目标数量超过该数值,则有可能会清除现有的绑定并需要重新创建,这可能导致性能轻微下降。您可以通过设置spring.cloud.stream.dynamic-destination-cache-size属性来增加缓存大小,并将其设置为您所需的值。 |
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/
通过展示两个示例,我们想强调该方法适用于任何类型的外部源。
如果使用的是 Solace PubSub+ 绑定,Spring Cloud Stream 已保留 scst_targetDestination 标头(可通过 BinderHeaders.TARGET_DESTINATION 获取),允许将消息从绑定配置的目标重定向到由该标头指定的目标目标。这使得绑定能够管理发布到动态目标所需的资源,从而减轻框架的负担,并避免了前一个注释中提到的缓存问题。更多信息 在这里。 |
使用 StreamBridge 输出内容类型
您还可以使用以下方法签名,提供特定的内容类型public boolean send(String bindingName, Object data, MimeType outputContentType)。
或者如果您发送的数据是Message,则会尊重其内容类型。
使用特定绑定程序类型的 StreamBridge
Spring Cloud Stream 支持多种绑定器场景。例如,您可能从 Kafka 接收数据并将其发送到 RabbitMQ。
有关多个绑定器场景的更多信息,请参阅绑定器部分以及特别关注类路径上的多个绑定器
如果您计划使用StreamBridge并且在应用程序中配置了多个绑定器,则还必须告诉StreamBridge使用哪个绑定器。为此,send方法还有两种变体:
public boolean send(String bindingName, @Nullable String binderType, Object data)
public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)
如您所见,您可以提供一个额外的参数binderType,告诉BindingService在创建动态绑定时使用哪个binder。
在使用 spring.cloud.stream.output-bindings 属性或绑定已在不同绑定器下创建的情况下,binderType 参数将不起作用。 |
使用通道拦截器与StreamBridge
由于StreamBridge使用MessageChannel来建立输出绑定,因此当通过StreamBridge发送数据时可以激活通道拦截器。
由应用程序决定在StreamBridge上应用哪些通道拦截器。
除非被标注为@GlobalChannelInterceptor(patterns = "*"),否则Spring Cloud Stream不会将检测到的所有通道拦截器注入StreamBridge。
假设您在应用程序中有以下两个不同的 StreamBridge 绑定。
streamBridge.send("foo-out-0", message);
和
streamBridge.send("bar-out-0", message);
现在,如果您希望在StreamBridge绑定上应用通道拦截器,则可以声明以下GlobalChannelInterceptor bean。
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
然而,如果您不喜欢上述全局方法并希望为每个绑定都有专用拦截器,则可以执行以下操作。
@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
和
@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
您可以根据业务需求灵活地使这些模式更加严格或自定义。
采用这种方法,应用程序能够决定在StreamBridge中注入哪些拦截器,而不是应用所有可用的拦截器。
StreamBridge通过StreamOperations接口提供了一个契约,其中包含了StreamBridge的所有send方法。因此,应用程序可以选择使用StreamOperations进行自动装配。当需要对使用StreamBridge的代码进行单元测试时,这非常方便,因为它可以为StreamOperations接口提供模拟或类似的机制。 |
响应式函数支持
由于Spring Cloud Function建立在Project Reactor之上,因此您无需做太多事情就可以从响应式编程模型中受益,在实现Supplier、Function或Consumer时。
例如:
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
return flux -> flux.map(val -> val.toUpperCase());
}
}
|
选择响应式或命令式编程模型时必须理解一些重要的事项。 完全响应式还是仅API? 使用响应式 API 并不一定意味着您可以利用该 API 的所有响应式特性。换句话说,像背压和其他高级功能只有在与兼容系统(如 Reactive Kafka binder)一起工作时才能发挥作用。如果您正在使用常规的 Kafka 或 Rabbit 或任何其他非响应式绑定器,则只能受益于响应式 API 本身的便利性,而不能受益于其高级功能,因为实际的数据流源或目标并非响应式。 错误处理与重试 在本手册中,您会看到有关基于框架的错误处理、重试和其他功能以及与之相关的配置属性的多个引用。重要的是要明白,它们仅影响命令式函数,在涉及响应式函数时您不应该抱有相同的期望。而且这就是原因。 . .响应式和命令式函数之间存在根本性差异。命令式函数是框架在接收到每条消息时调用的消息处理器。对于N条消息,将会有N次调用此函数,并且由于这个原因我们可以包装这样的函数并添加额外的功能,例如错误处理、重试等。响应式函数是初始化函数。仅调用一次,用于获取用户提供的连接到框架所提供的Flux/Mono的引用。之后,我们(框架)对流就完全没有任何可见性或控制权了。因此,使用响应式函数时,您必须依赖响应式 API 的丰富功能来进行错误处理和重试(即。e., |
函数组合
使用函数式编程模型,您还可以从函数组合中获益,其中可以从一组简单函数动态组合复杂的处理器。作为示例,让我们将以下函数Bean添加到上面定义的应用程序中。
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}
并将spring.cloud.function.definition属性修改为反映您希望从‘toUpperCase’和‘wrapInQuotes’组合出新功能的意图。为此,Spring Cloud Function依赖于|(管道)符号。因此,要完成我们的示例,该属性现在将如下所示:
--spring.cloud.function.definition=toUpperCase|wrapInQuotes
| Spring Cloud Function 提供的功能组合支持的一个巨大优势是,你可以组合响应式函数和命令式函数。 |
组合的结果是一个单一函数,正如您所猜测的那样,它可能有一个很长且相当晦涩的名字(例如,foo|bar|baz|xyz. . .),在其他配置属性方面会带来很大的不便。这就是功能绑定名称部分描述的描述性绑定名称特性可以提供帮助的地方。
例如,如果我们想给我们的 toUpperCase|wrapInQuotes 起一个更具描述性的名字,我们可以使用以下属性 spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput 来实现这一点,并允许其他配置属性引用该绑定名称(例如,spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。
函数式组合与横切关注点
函数组合通过将复杂性分解为一组简单、可单独管理/测试的组件,有效地帮助您处理复杂性,这些组件在运行时仍可以表示为一个整体。但这并不是唯一的优点。
您也可以使用组合来解决某些横切的非功能性问题,例如内容增强。例如,假设您收到的消息可能缺少某些标题,或者某些标题并不处于您的业务功能所期望的状态。现在,您可以实现一个单独的功能来解决这些问题,然后将其与主要的业务功能组合起来。
让我们看一下这个例子
@SpringBootApplication
public class DemoStreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemoStreamApplication.class,
"--spring.cloud.function.definition=enrich|echo",
"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
"--spring.cloud.stream.bindings.input.destination=myDestination",
"--spring.cloud.stream.bindings.input.group=myGroup");
}
@Bean
public Function<Message<String>, Message<String>> enrich() {
return message -> {
Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
};
}
@Bean
public Function<Message<String>, Message<String>> echo() {
return message -> {
Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
System.out.println("Incoming message " + message);
return message;
};
}
}
虽然这个例子很简单,但它展示了如何通过一个函数向传入的消息添加额外的头信息(非功能需求),
以便另一个函数 - echo - 能够从中受益。echo 函数保持了简洁,并且只专注于业务逻辑。
您还可以看到使用 spring.cloud.stream.function.bindings 属性来简化组合绑定名称的方法。
具有多个输入和输出参数的函数
从版本 3.0 开始,Spring Cloud Stream 提供了对具有多个输入和/或多个输出(返回值)的函数的支持。这实际上意味着什么以及它针对的是哪种使用场景?
-
大数据:假设您处理的数据源高度无序,包含各种类型的数据元素(例如订单、交易等),并且您需要有效地整理这些数据。
-
数据聚合:另一个用例可能需要您合并来自两个或多个传入流的数据元素
上述仅描述了您可能需要使用单个函数来接受和/或生成多个数据流的一些用例。这正是我们这里的目标用例。
此外,请注意此处对流概念的强调略有不同。假设只有当这些函数能够访问实际的数据流(而不是单个元素)时,它们才具有价值。因此,我们依赖于Project Reactor提供的抽象(即Flux和Mono),它已经作为spring-cloud-functions引入的依赖项存在于类路径上。
另一个重要的方面是多个输入和输出的表示。虽然 Java 提供了多种不同的抽象来表示 多个事物,但这些抽象具有a) 无界性、b) 缺乏元数 和 c) 缺乏类型信息 这些重要特性。例如,我们来看一下Collection或者数组,它们只能用来描述单一类型的多个事物,或向上转换为一个Object,这会影响 Spring Cloud Stream 的透明类型转换功能等。
为了满足所有这些需求,初始支持依赖于使用 Project Reactor 提供的另一种抽象——Tuples 的签名。不过,我们正在努力实现更灵活的签名。
| 请参考绑定和绑定名部分,了解用于建立此类应用程序中使用的绑定名的命名约定。 |
让我们来看几个示例:
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return tuple -> {
Flux<String> stringStream = tuple.getT1();
Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
};
}
}
上述示例演示了一个函数,该函数接受两个输入(第一个类型为String,第二个类型为Integer),并生成一个类型为String的输出。
因此,对于上面的例子,两个输入绑定将是 gather-in-0 和 gather-in-1,为了保持一致性,输出绑定也遵循相同的约定,并命名为 gather-out-0。
知道这一点将允许您设置绑定特定的属性。
例如,以下内容将覆盖 gather-in-0 绑定的 content-type:
--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
}
前面的例子与上一个样本有些相反,演示了接收类型为Integer的单一输入并产生两种类型均为String的输出的功能。
因此,对于上述示例,输入绑定是 scatter-in-0,输出绑定是 scatter-out-0 和 scatter-out-1。
并且您可以用以下代码对其进行测试:
@Test
public void testSingleInputMultiOutput() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleApplication.class))
.run("--spring.cloud.function.definition=scatter")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
for (int i = 0; i < 10; i++) {
inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
}
int counter = 0;
for (int i = 0; i < 5; i++) {
Message<byte[]> even = outputDestination.receive(0, 0);
assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
Message<byte[]> odd = outputDestination.receive(0, 1);
assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
}
}
}
单个应用程序中的多个功能
在某些情况下,可能需要在一个应用程序中分组多个消息处理器。您可以通过定义多个函数来实现此操作。
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
在上面的例子中,我们有配置了两个函数uppercase和reverse。因此,首先,如前所述,我们需要注意到存在冲突(多个函数),所以需要通过提供指向实际要绑定的函数的spring.cloud.function.definition属性来解决它。但在这里我们将使用;分隔符来同时指向这两个函数(参见下面的测试用例)。
| 与具有多个输入/输出的函数一样,请参考[绑定和绑定名称]部分,了解用于建立此类应用程序使用的绑定名称的命名约定。 |
并且您可以用以下代码对其进行测试:
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ReactiveFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
批量消费者
当使用支持批处理侦听器的 MessageChannelBinder,并且该功能对于消费者绑定已启用时,可以设置 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 为 true 以使整个消息批次通过 List传递给函数。
@Bean
public Function<List<Person>, Person> findFirstPerson() {
return persons -> persons.get(0);
}
批处理类型转换
与单个消息消费者的类型转换类似,批处理要求批中的每个消息都被转换为请求的类型。例如,在前面的例子中,这个类型是Person。
了解每个批处理消息的标头是单独提供的也很重要,MessageHeaders表示整个批次的消息。这些消息及其相应的批处理标头由各自的绑定器创建,并且它们的结构可能不同。因此,您应该参考绑定器文档以了解批处理标头的结构。对于Kafka和Rabbit,请分别搜索amqp_batchedHeaders和kafka_batchConvertedHeaders。
简而言之,如果您有一个表示包含5个有效负载批次的消息,则同一消息将包含一组标头,其中每个标头对应具有相同索引的有效负载。
但是,如果某个特定有效负载无法转换会发生什么?在单个消息场景中,我们简单地返回null,并使用未转换的消息调用您的方法,这要么导致异常,要么允许您根据函数签名处理原始消息。<br/>
批量处理的情况下,情况会稍微复杂一些。如果未转换的有效载荷返回null,则实际上会减少批处理大小。例如,如果原始批次包含5条消息,并且有2条未能成功转换,那么转换后的批次中将只包含3条消息。这可能是可以接受的,但是对应的批处理头信息呢?仍然会有5个头信息,因为它们是在绑定器形成初始批次时创建的。这种差异使得难以将头信息与相应的有效载荷相关联。
为了解决这个问题,我们提供了MessageConverterHelper接口。
public interface MessageConverterHelper {
/**
* This method will be called by the framework in cases when a message failed to convert.
* It allows you to signal to the framework if such failure should be considered fatal or not.
*
* @param message failed message
* @return true if conversion failure must be considered fatal.
*/
default boolean shouldFailIfCantConvert(Message<?> message) {
return false;
}
/**
* This method will be called by the framework in cases when a single message within batch of messages failed to convert.
* It provides a place for providing post-processing logic before message converter returns.
*
* @param message failed message.
* @param index index of failed message within the batch
*/
default void postProcessBatchMessageOnFailure(Message<?> message, int index) {
}
}
如果已实现此接口,则当无法转换特定有效负载时,框架的消息转换器逻辑会调用该接口对批处理消息执行后处理。
默认情况下,Kafka 和 Rabbit 的实现会自动删除相应的批处理头信息,以维护批量负载与其头信息之间的关联。然而,如果您需要为这些情况添加自定义行为,可以提供自己的实现并将其注册为一个 Bean。
此外,该接口提供了一种方法,可以更确定性地处理转换失败的情况。默认情况下,此方法返回false,但如果您希望在发生转换错误时使整个过程失败,则可以自定义实现。
批量生产者
您还可以在生产者端使用批处理的概念,通过返回消息集合来实现,这实际上提供了相反的效果,即集合中的每条消息将由绑定器单独发送。
考虑以下函数:
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
返回列表中的每条消息都将单独发送,从而向输出目标发送四条消息。
作为函数的 Spring 集成流程
实现函数时,您可能有复杂的业务需求,这些需求属于企业集成模式(EIP)类别。这些问题最好使用像Spring Integration(SI)这样的框架来处理,它是EIP的参考实现。
幸运的是,SI已经提供了通过集成流作为网关来暴露集成流为函数的支持。考虑以下示例:
@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
}
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlow.from(MessageFunction.class, spec -> spec.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.log(LoggingHandler.Level.WARN)
.bridge()
.get();
}
public interface MessageFunction extends Function<Message<String>, Message<String>> {
}
}
对于熟悉 SI 的人来说,您可以看到我们定义了一个类型为 IntegrationFlow 的 bean,在其中声明了要作为Function<String, String>(使用 SI DSL)公开的集成流,称为uppercase。MessageFunction接口允许我们显式声明输入和输出的类型,以便进行适当的类型转换。有关类型转换的更多内容,请参见内容协商部分。
要接收原始输入,可以使用from(Function.class, …)。
生成的函数绑定到目标绑定器公开的输入和输出目标。
| 请参考[绑定和绑定名称]部分以了解用于建立此类应用程序的绑定名称所使用的命名约定。 |
有关 Spring Integration 和 Spring Cloud Stream 在函数式编程模型方面的互操作性的更多详细信息,您可能会发现这篇文章非常有趣,因为它深入探讨了通过结合 Spring Integration 和 Spring Cloud Stream/Functions 的最佳实践可以应用的各种模式。
使用轮询消费者
概述
使用轮询消费者时,您按需轮询PollableMessageSource。要定义轮询消费者的绑定,需要提供spring.cloud.stream.pollable-source属性。
考虑以下轮询消费者绑定的示例:
--spring.cloud.stream.pollable-source=myDestination
前面示例中的可轮询源名称 myDestination 将导致 myDestination-in-0 绑定名称保持一致,与函数式编程模型一致。
在前面的示例中,你可能会如下使用轮询消费者:
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition()) {
try {
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}
一种减少手动操作并且更符合 Spring 风格的替代方法是配置一个计划任务 bean。例如,
@Scheduled(fixedDelay = 5_000)
public void poll() {
System.out.println("Polling...");
this.source.poll(m -> {
System.out.println(m.getPayload());
}, new ParameterizedTypeReference<Foo>() { });
}
该 PollableMessageSource.poll() 方法接受一个 MessageHandler 参数(通常是一个lambda表达式,如这里所示)。如果消息被接收并成功处理,则返回 true。
与消息驱动的消费者一样,如果MessageHandler抛出异常,则会将消息发布到错误通道上,如Error Handling中所述。
通常,poll() 方法在 MessageHandler 退出时确认消息。如果该方法异常退出,则会拒绝消息(不会重新入队),但请参阅错误处理。您可以通过自行负责确认操作来覆盖此行为,如下例所示:
@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
return args -> {
while (someCondition()) {
if (!dest1In.poll(m -> {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
// e.g. hand off to another thread which can perform the ack
// or acknowledge(Status.REQUEUE)
})) {
Thread.sleep(1000);
}
}
};
}
\
必须在某个时间点 ack(或 nack)消息,以避免资源泄漏。 |
一些消息传递系统(如 Apache Kafka)在日志中维护一个简单的偏移量。如果一条消息传递失败并被重新排队,其偏移量为 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);,那么任何后续已成功确认的消息都会被重新发送。 |
还有另一种重载方法poll,其定义如下:
poll(MessageHandler handler, ParameterizedTypeReference<?> type)
代码type是转换提示,允许将传入的消息有效负载进行转换,如以下示例所示:
boolean result = pollableSource.poll(received -> {
Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
...
}, new ParameterizedTypeReference<Map<String, Foo>>() {});
处理错误
默认情况下,轮询源会配置一个错误通道;如果回调抛出异常,则向错误通道发送ErrorMessage(<destination>.<group>.errors);此错误通道也会桥接到全局Spring Integration errorChannel。
您可以使用 @ServiceActivator 订阅错误通道中的任何一个以处理错误;如果没有订阅,错误将仅被记录,并且消息将被视为成功确认。
如果错误通道服务激活器抛出异常,默认情况下该消息将被拒绝并且不会重发。
如果服务激活器抛出一个 RequeueCurrentMessageException,则该消息将在代理中重新排队,并在随后的轮询时再次检索。
如果监听器直接抛出一个RequeueCurrentMessageException,正如上面讨论的那样,该消息将被重新排队,并不会发送到错误通道。