生成和使用消息
您可以通过简单地编写函数并将它们公开为@Bean
s.
您还可以使用基于 Spring Integration 注释的配置或
基于 Spring Cloud Stream 注释的配置,尽管从 spring-cloud-stream 3.x 开始
我们建议使用函数式实现。
Spring Cloud 函数支持
概述
从 Spring Cloud Stream v2.1 开始,定义流处理程序和源的另一种替代方法是使用内置的
支持 Spring Cloud 函数,其中它们可以表示为
类型java.util.function.[Supplier/Function/Consumer]
.
要指定要绑定到绑定公开的外部目标的功能 Bean,
您必须提供spring.cloud.function.definition
财产。
如果您只有java.util.function.[Supplier/Function/Consumer] 您可以
跳过spring.cloud.function.definition 属性,因为这样的功能 bean 将被自动发现。然而
使用此类属性以避免任何混淆被认为是最佳实践。
有时这种自动发现可能会妨碍,因为类型为java.util.function.[Supplier/Function/Consumer] 除了处理消息之外,还可以用于其他目的,但它是自动发现和自动绑定的。
对于这些罕见的情况,可以通过提供spring.cloud.stream.function.autodetect 值设置为false . |
下面是应用程序将消息处理程序公开为java.util.function.Function
通过充当数据的使用者和生产者来有效支持直通语义。
@SpringBootApplication
public class MyFunctionBootApp {
public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class);
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}
在前面的示例中,我们定义了一个类型为java.util.function.Function
调用 toUpperCase 充当消息处理程序
其“输入”和“输出”必须绑定到提供的目标绑定器公开的外部目标。
默认情况下,“input”和“output”绑定名称将为toUpperCase-in-0
和toUpperCase-out-0
.
有关用于建立绑定名称的命名约定的详细信息,请参阅功能绑定名称部分。
以下是支持其他语义的简单函数式应用程序的示例:
下面是公开为java.util.function.Supplier
@SpringBootApplication
public static class SourceFromSupplier {
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
下面是公开为java.util.function.Consumer
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
提供商(来源)
Function
和Consumer
在如何触发它们的调用时非常简单。它们是基于
在发送到它们绑定到的目的地的数据(事件)上。换句话说,它们是经典的事件驱动组件。
然而Supplier
在触发方面属于自己的类别。由于根据定义,它是数据的源(源),因此它不是
订阅任何入站目的地,因此必须由其他一些机制触发。
还有一个问题Supplier
实施,这可能是势在必行的,也可能是被动的,并且与触发此类提供商直接相关。
请考虑以下示例:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
前面的Supplier
bean 每当其get()
方法被调用。但是,谁调用此方法以及多久调用一次?
该框架提供了一个默认的轮询机制(回答“谁?
每一秒(回答“多久一次?
换句话说,上述配置每秒生成一条消息,每条消息都发送到output
由活页夹公开的目标。
要了解如何自定义轮询机制,请参阅轮询配置属性部分。
考虑一个不同的例子:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
前面的Supplier
Bean 采用响应式编程风格。通常,与命令性提供商不同,
它应该只触发一次,因为调用其get()
方法生成(提供)连续的消息流,而不是
个人消息。
该框架识别编程风格的差异,并保证此类提供商仅被触发一次。
但是,想象一下您想要轮询某些数据源并返回表示结果集的有限数据流的用例。 响应式编程风格是这样的提供商的完美机制。然而,鉴于产生的流的有限性, 此类提供商仍需要定期调用。
请考虑以下示例,该示例通过生成有限数据流来模拟此类用例:
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
bean 本身用PollableBean
注释(@Bean
),从而向框架发出信号,尽管实现
对于这样的提供商是被动的,它仍然需要进行轮询。
有一个splittable 属性定义在PollableBean 向此注释的后处理器发出信号
必须拆分注释组件生成的结果,并将其设置为true 默认情况下。这意味着
框架会将发送每个项目的返回拆分为单独的消息。如果不是
他想要的行为你可以将其设置为false 此时,该提供商将简单地返回
产生的通量而不分裂它。 |
提供商和螺纹加工
正如您现在所了解的那样,与Function 和Consumer ,由事件触发(它们有输入数据),Supplier 没有
任何输入,因此由不同的机制触发 - 轮询器,它可能具有不可预测的线程机制。虽然
线程机制大多数时候与函数的下游执行无关,在某些情况下可能会出现问题
特别是对于可能对线程亲和力有一定期望的集成框架。例如,Spring Cloud Sleuth 依赖于
跟踪存储在线程本地的数据。
对于这些情况,我们通过StreamBridge ,用户可以更好地控制线程机制。您可以获得更多详细信息
在将任意数据发送到输出(例如外部事件驱动源)部分。 |
消费者(响应式)
反应性的Consumer
有点特别,因为它有一个 void 返回类型,让框架没有引用可以订阅。
很可能你不需要写Consumer<Flux<?>>
,而是将其写为Function<Flux<?>, Mono<Void>>
调用then
运算符作为流上的最后一个运算符。
例如:
public Function<Flux<?>, Mono<Void>> consumer() {
return flux -> flux.map(..).filter(..).then();
}
但是,如果您确实需要编写一个显式的Consumer<Flux<?>>
,记得订阅传入的通量。
此外,请记住,在混合响应式函数和命令式函数时,相同的规则也适用于函数组合。
Spring Cloud Function 确实支持使用命令式组合响应式函数,但是您必须注意某些限制。
例如,假设您已经使用命令式消费者组成了响应式函数。
这种组合的结果是反应性Consumer
.但是,无法订阅本节前面讨论的此类消费者,
因此,只能通过使您的消费者响应式并手动订阅(如前所述)或将您的函数更改为命令式来解决此限制。
轮询配置属性
以下属性由 Spring Cloud Stream 公开,并以spring.integration.poller.
:
- 固定延迟
-
修复了默认轮询器的延迟(以毫秒为单位)。
默认值:1000L。
- 最大消息PerPoll
-
默认轮询程序的每个轮询事件的最大消息数。
默认值:1L。
- 克罗恩
-
Cron 表达式值。
默认值:无。
- 初始延迟
-
定期触发器的初始延迟。
默认值:0。
- 时间单位
-
要应用于延迟值的 TimeUnit。
默认值:MILLISECONDS。
例如--spring.integration.poller.fixed-delay=2000
将轮询器间隔设置为每两秒轮询一次。
按绑定轮询配置
上一节介绍了如何配置将应用于所有绑定的单个默认轮询器。虽然它非常适合微服务 spring-cloud-stream 模型,该模型专为每个微服务代表单个组件(例如提供商)而设计,因此默认轮询器配置就足够了,但在某些情况下, 您可能有多个组件需要不同的轮询配置
对于这种情况,请使用按绑定的方式配置轮询器。例如,假设您有一个输出绑定supply-out-0
.在这种情况下,您可以为此类
绑定使用spring.cloud.stream.bindings.supply-out-0.producer.poller..
前缀(例如spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000
).
将任意数据发送到输出(例如外部事件驱动源)
在某些情况下,实际的数据源可能来自不是活页夹的外部(外部)系统。例如, 数据源可能是经典的 REST 端点。我们如何将这样的源代码与 spring-cloud-stream 使用的功能机制桥接?
Spring Cloud Stream 提供了两种机制,让我们更详细地了解它们
在这里,对于这两个示例,我们将使用名为delegateToSupplier
绑定到根 Web 上下文,
通过 StreamBridge 机制委托传入请求进行流式传输。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
在这里,我们自动连接一个StreamBridge
bean 的 bean,它允许我们有效地将数据发送到输出绑定
将非流应用程序与 spring-cloud-stream 桥接。请注意,前面的示例没有任何
定义的源函数(例如,Supplier bean)使框架没有触发器来提前创建源绑定,这在配置包含函数 bean 的情况下是典型的。这很好,因为StreamBridge
将启动输出绑定的创建(以及
目标自动配置(如有必要)用于首次调用其时不存在的绑定send(..)
作缓存它
后续重用(有关更多详细信息,请参阅 StreamBridge 和动态目标)。
但是,如果您想在初始化(启动)时预先创建输出绑定,则可以从spring.cloud.stream.output-bindings
属性,您可以在其中声明源的名称。提供的名称将用作创建源绑定的触发器。您可以使用来表示多个源(多个输出绑定)(例如,;
--spring.cloud.stream.output-bindings=foo;bar
)
另外,请注意streamBridge.send(..)
方法采用Object
对于数据。这意味着您可以发送 POJO 或Message
到它和它发送输出时将经历相同的例程,就好像它来自任何提供相同级别的函数或提供商一样与函数保持一致。这意味着输出类型转换、分区等都像来自函数产生的输出一样受到尊重。
与显式绑定创建中解释的不同,StreamBridge 在设计时考虑了性能和根据需要动态创建任意数量绑定的能力。为此,StreamBridge 创建的实际绑定不会缓存在应用程序上下文中,因此无法按照绑定可视化和控制中所述进行管理。 但是,如果您仍然希望使用 StreamBridge 动态创建绑定并在之后进行管理,请在使用 StreamBridge 之前使用以下机制显式创建绑定 - ref:spring-cloud-stream/binding_visualization_control.adocl#_define_new_and_manage_existing_bindings[以编程方式定义新绑定] |
具有异步发送功能的 StreamBridge
StreamBridge
使用Spring Integration框架提供的发送机制,该框架是Spring Cloud Stream的核心。默认情况下,此机制使用发送方的线程。换句话说,发送是阻塞。虽然这在许多情况下是可以的,但在某些情况下,您希望此类发送是异步的。为此,请使用setAsync(true)
方法StreamBridge
在调用其中一个 send 方法之前。
异步发送的可观测性上下文传播
当使用框架提供的可观测性支持以及支持 Spring 框架时,打破线程边界将影响可观测性上下文的一致性,从而影响您的跟踪历史。为避免这种情况,您只需添加context-propagation
依赖形式千分尺(见下文)
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
<version>1.1.0</version>
</dependency>
StreamBridge 和动态目的地
StreamBridge
也可用于与用例类似的输出目的地提前未知的情况
在路由 FROM 使用者部分中进行了描述。
让我们看一下示例
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, args);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("myDestination", body);
}
}
如您所见,前面的示例与上一个示例非常相似,只是通过spring.cloud.stream.output-bindings
属性(未提供)。
在这里,我们将数据发送到myDestination
name 不作为绑定存在。因此,此类名称将被视为动态目的地
如路由 FROM 使用者部分所述。
在前面的示例中,我们使用ApplicationRunner
作为外来来源来喂养溪流。
一个更实际的例子,其中外部源是 REST 端点。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
streamBridge.send("myBinding", body);
}
}
正如您在内部看到的delegateToSupplier
我们使用 StreamBridge 将数据发送到myBinding
捆绑。在这里,您还受益于
的动态特征StreamBridge
其中,如果myBinding
不存在,它将自动创建并缓存,否则将使用现有绑定。
如果存在许多动态目标,缓存动态目标(绑定)可能会导致内存泄漏。拥有一定程度的控制权
我们为默认缓存大小为 10 的输出绑定提供了一种自我逐出缓存机制。这意味着,如果动态目标大小超过该数字,则可能会逐出现有绑定,因此需要重新创建,这可能会导致轻微的性能下降。您可以通过spring.cloud.stream.dynamic-destination-cache-size 属性将其设置为所需值。 |
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/
通过展示两个示例,我们想强调该方法适用于任何类型的外国来源。
如果您使用的是 Solace PubSub+ 绑定器,则 Spring Cloud Stream 已保留scst_targetDestination 标头(可通过BinderHeaders.TARGET_DESTINATION检索),它允许将消息从其绑定的配置目标重定向到此标头指定的目标目标。这允许 Binder 管理发布到动态目标所需的资源,从而减轻框架的必要性,并避免上一个注释中提到的缓存问题。更多信息在这里。 |
使用 StreamBridge 输出内容类型
如有必要,还可以使用以下方法签名提供特定内容类型public boolean send(String bindingName, Object data, MimeType outputContentType)
.
或者,如果您将数据作为Message
,则其内容类型将得到尊重。
在 StreamBridge 中使用特定活页夹类型
Spring Cloud Stream 支持多个 binder 场景。例如,您可能从 Kafka 接收数据并将其发送到 RabbitMQ。
有关多个绑定器场景的更多信息,请参阅绑定器部分,特别是类路径上的多个绑定器
如果您计划使用 StreamBridge,并且在您的应用程序中配置了多个活页夹,您还必须告诉 StreamBridge
使用哪种活页夹。为此,还有两种变体send
方法:
public boolean send(String bindingName, @Nullable String binderType, Object data)
public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)
如您所见,您可以提供一个额外的论点 -binderType
,告诉 BindingService 在创建动态绑定时使用哪个绑定器。
对于以下情况spring.cloud.stream.output-bindings 属性,或者绑定已在不同的绑定器下创建,则binderType 参数将无效。 |
将信道拦截器与 StreamBridge 结合使用
因为StreamBridge
使用MessageChannel
要建立输出绑定,您可以在通过StreamBridge
.
由应用程序决定在哪些信道拦截器上应用StreamBridge
.
Spring Cloud Stream 不会将检测到的所有通道拦截器注入到StreamBridge
除非它们被注释为@GlobalChannelInterceptor(patterns = "*")
.
让我们假设您有以下两个不同的StreamBridge
绑定。
streamBridge.send("foo-out-0", message);
和
streamBridge.send("bar-out-0", message);
现在,如果您希望在两个StreamBridge
bindings,则可以声明以下内容GlobalChannelInterceptor
豆。
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
但是,如果您不喜欢上述全局方法,并且希望为每个绑定配备一个专用的拦截器,那么您可以执行以下作。
@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
和
@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
您可以灵活地使模式更加严格或根据您的业务需求进行定制。
通过这种方法,应用程序能够决定要注入哪些拦截器StreamBridge
而不是应用所有可用的拦截器。
StreamBridge 通过StreamOperations 包含所有send 方法StreamBridge .因此,应用程序可以选择使用StreamOperations .当涉及到StreamBridge 通过为StreamOperations 接口。 |
响应式函数支持
由于 Spring Cloud Function 是构建在 Project Reactor 之上的,因此您不需要做太多事情
在实现时从响应式编程模型中受益Supplier
,Function
或Consumer
.
例如:
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
return flux -> flux.map(val -> val.toUpperCase());
}
}
在选择响应式或命令式编程模型时,必须了解一些重要的事情。 完全响应式还是仅 API? 使用响应式 API 并不一定意味着您可以从此类 API 的所有响应式功能中受益。换句话说,背压和其他高级功能之类的东西只有在与兼容系统(例如响应式 Kafka 绑定器)一起使用时才有效。如果您使用的是常规 Kafka 或 Rabbit 或任何其他非响应式绑定器,则只能从响应式 API 本身的便利性中受益,而不能从其高级功能中受益,因为流的实际源或目标不是响应式的。 错误处理和重试 在本手册中,您将看到一些关于基于框架的错误处理、重试和其他功能以及与之相关的配置属性的参考资料。重要的是要了解它们只影响命令式函数,当涉及到响应式函数时,您不应该有相同的期望。这就是原因......
响应式函数和命令式函数之间存在根本区别。
命令式函数是一个消息处理程序,由框架在收到的每条消息上调用。因此,对于 N 条消息,将有 N 次此类函数的调用,因此我们可以包装此类函数并添加其他功能,例如错误处理、重试等。
响应式函数是初始化函数。它仅调用一次以获取对用户提供的 Flux/Mono 的引用,以与框架提供的 Flux/Mono 连接。在那之后,我们(框架)对流完全没有可见性或控制权。
因此,对于响应式函数,在错误处理和重试(即 |
功能组成
使用函数式编程模型,您还可以从函数组合中受益,其中您可以从一组简单函数动态组合复杂的处理程序。 例如,让我们将以下函数 bean 添加到上面定义的应用程序中
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}
并修改spring.cloud.function.definition
属性来反映您从 'toUpperCase' 和 'wrapInQuotes' 编写新函数的意图。
为此,Spring Cloud Function 依赖于|
(管道)符号。因此,为了完成我们的示例,我们的属性现在将如下所示:
--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function 提供的函数组合支持的一大好处是 事实上,您可以编写响应式和命令式函数。 |
组合的结果是一个函数,正如您可能猜到的那样,它可以有一个非常长且相当神秘的名称(例如,foo|bar|baz|xyz. . .
)
当涉及到其他配置属性时,这带来了很大的不便。这就是功能绑定名称部分中描述的描述性绑定名称功能可以提供帮助的地方。
例如,如果我们想给我们的toUpperCase|wrapInQuotes
我们可以这样做的更具描述性的名称
具有以下属性spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput
允许
引用该绑定名称的其他配置属性(例如spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination
).
功能组成和跨领域关注点
函数组合有效地允许您通过分解复杂性来解决复杂性到一组简单且可单独管理/可测试的组件,这些组件仍然可以在运行时表示为一个。但这并不是唯一的好处。
您还可以使用组合来解决某些横切的非功能性问题, 例如内容丰富。例如,假设您有一条传入消息,该消息可能 缺少某些标题,或者某些标题不处于您的业务的确切状态 函数会期望。您现在可以实现一个单独的函数来解决这些问题 关注点,然后将其与主要业务功能组合在一起。
让我们看一下示例
@SpringBootApplication
public class DemoStreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemoStreamApplication.class,
"--spring.cloud.function.definition=enrich|echo",
"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
"--spring.cloud.stream.bindings.input.destination=myDestination",
"--spring.cloud.stream.bindings.input.group=myGroup");
}
@Bean
public Function<Message<String>, Message<String>> enrich() {
return message -> {
Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
};
}
@Bean
public Function<Message<String>, Message<String>> echo() {
return message -> {
Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
System.out.println("Incoming message " + message);
return message;
};
}
}
虽然微不足道,但此示例演示了一个函数如何使用附加标头(非功能性问题)来丰富传入的消息,
所以另一个函数 -echo
- 可以从中受益。这echo
函数保持干净,仅专注于业务逻辑。
您还可以查看spring.cloud.stream.function.bindings
属性来简化组合绑定名称。
具有多个输入和输出参数的函数
从 3.0 版开始,spring-cloud-stream 支持以下函数: 具有多个输入和/或多个输出(返回值)。这实际上意味着什么以及 它针对什么类型的用例?
-
大数据:想象一下您正在处理的数据源高度无组织,并且包含各种类型的数据元素 (例如,订单、交易等),您需要有效地对其进行整理。
-
数据聚合:另一个用例可能需要您合并来自 2+ 传入_streams的数据元素。
以上仅描述了几个用例,在这些用例中,您可能需要使用单个函数来接受和/或生成 多个数据流。这就是我们在这里针对的用例类型。
另外,请注意此处对流概念的强调略有不同。假设此类函数仅有价值
如果他们被授予对实际数据流(而不是单个元素)的访问权限。因此,我们依赖于
Project Reactor 提供的抽象(即Flux
和Mono
) 上已经在
classpath 作为 spring-cloud-functions 引入的依赖项的一部分。
另一个重要方面是多个输入和输出的表示。虽然 java 提供了
各种不同的抽象来表示这些抽象的多个
是 a) 无界的,b) 缺乏 arity 和 c) 缺乏类型信息,这些信息在这种情况下都很重要。
举个例子,让我们看看Collection
或者一个数组,它只允许我们
描述单个类型的多个或将所有内容上调到Object
,影响的透明类型转换功能
spring-cloud-stream 等。
因此,为了满足所有这些要求,初始支持依赖于使用另一个抽象的签名 由 Project Reactor - Tuples 提供。但是,我们正在努力允许更灵活的签名。
请参阅绑定和绑定名称部分,了解用于建立此类应用程序使用的绑定名称的命名约定。 |
让我们看几个示例:
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return tuple -> {
Flux<String> stringStream = tuple.getT1();
Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
};
}
}
上面的示例演示了接受两个输入(第一个类型String
和第二种类型Integer
)
并生成类型的单个输出String
.
因此,对于上面的示例,两个输入绑定将是gather-in-0
和gather-in-1
为了保持一致性
输出绑定也遵循相同的约定,并命名为gather-out-0
.
了解这一点将允许您设置绑定特定属性。
例如,以下内容将覆盖gather-in-0
捆绑:
--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
}
上面的例子与前面的示例有点相反,并演示了函数
接受类型的单个输入Integer
并产生两个输出(均为String
).
因此,对于上面的示例,输入绑定为scatter-in-0
和
输出绑定是scatter-out-0
和scatter-out-1
.
你用以下代码对其进行测试:
@Test
public void testSingleInputMultiOutput() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleApplication.class))
.run("--spring.cloud.function.definition=scatter")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
for (int i = 0; i < 10; i++) {
inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
}
int counter = 0;
for (int i = 0; i < 5; i++) {
Message<byte[]> even = outputDestination.receive(0, 0);
assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
Message<byte[]> odd = outputDestination.receive(0, 1);
assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
}
}
}
单个应用程序中的多种功能
可能还需要在单个应用程序中对多个消息处理程序进行分组。您可以通过以下方式做到这一点 定义多个功能。
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
在上面的示例中,我们有定义两个函数的配置uppercase
和reverse
.
因此,首先,如前所述,我们需要注意存在冲突(多个函数),因此
我们需要通过提供spring.cloud.function.definition
指向实际函数的属性
我们想绑定。除了这里我们将使用分隔符来指向这两个函数(请参阅下面的测试用例)。;
与具有多个输入/输出的函数一样,请参阅 [绑定和绑定名称] 部分以了解命名 用于建立此类应用程序使用的绑定名称的约定。 |
你用以下代码对其进行测试:
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ReactiveFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
批处理使用者
使用MessageChannelBinder
支持批量监听器,并且为消费者绑定启用了该功能,您可以将spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
自true
以启用
要传递给函数的整批消息List
.
@Bean
public Function<List<Person>, Person> findFirstPerson() {
return persons -> persons.get(0);
}
批次类型转换
与单个消息使用者的类型转换类似,批处理要求将批处理中的每条消息转换为请求的类型。例如,在前面的示例中,此类型是Person
.
同样重要的是要了解,批处理中每条消息的标头在MessageHeaders
表示整个批次的消息。这些消息及其相应的批处理标头由相应的绑定器创建,它们的结构可能有所不同。因此,您应该参考活页夹文档来了解批处理标头的结构。对于 Kafka 和 Rabbit,您可以搜索amqp_batchedHeaders
和kafka_batchConvertedHeaders
分别。
简而言之,如果您有一条消息表示具有 5 个有效负载的批处理,则同一消息将包含一组标头,其中每个标头对应于具有相同索引的有效负载。
但是,如果特定有效负载无法转换会怎样?在单个消息方案中,我们只需返回 null 并使用未转换的消息调用您的方法,这会导致异常或允许您处理原始消息,具体取决于函数的签名。
在批处理的情况下,事情要复杂一些。为未转换的有效负载返回 null 可以有效地减小批处理大小。例如,如果原始批次包含 5 条消息,其中 2 条无法转换,则转换后的批次将仅包含 3 条消息。这可能是可以接受的,但是相应的批处理标头呢?仍然会有 5 个标头,因为它们是在活页夹形成初始批次时创建的。这种差异使得难以将标头与其相应的有效负载相关联。
为了解决此问题,我们提供了 MessageConverterHelper 接口。
public interface MessageConverterHelper {
/**
* This method will be called by the framework in cases when a message failed to convert.
* It allows you to signal to the framework if such failure should be considered fatal or not.
*
* @param message failed message
* @return true if conversion failure must be considered fatal.
*/
default boolean shouldFailIfCantConvert(Message<?> message) {
return false;
}
/**
* This method will be called by the framework in cases when a single message within batch of messages failed to convert.
* It provides a place for providing post-processing logic before message converter returns.
*
* @param message failed message.
* @param index index of failed message within the batch
*/
default void postProcessBatchMessageOnFailure(Message<?> message, int index) {
}
}
如果实现,则框架的消息转换器逻辑会调用此接口,以便在无法转换特定有效负载时对批处理消息执行后处理。
Kafka 和 Rabbit 的默认实现会自动删除相应的批处理标头,以保持批处理有效负载与其标头之间的关联。但是,如果您需要为此类情况添加自定义行为,则可以提供自己的实现并将其注册为 bean。
此外,该接口还提供了一种方法,可以更确定地处理转换失败。默认情况下,此方法返回false
,但如果您希望在发生转换错误时使整个过程失败,则可以自定义实现。
批量生产者
您还可以通过返回 Messages 的集合来在生产者端使用批处理的概念,该集合有效地提供了 相反效应,其中集合中的每条消息都将由活页夹单独发送。
考虑以下函数:
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
返回列表中的每条消息将单独发送,从而将四条消息发送到输出目标。
Spring Integration 流作为函数
实现函数时,您可能有适合该类别的复杂需求 企业集成模式 (EIP)。最好使用 框架,例如 Spring Integration(SI),它是 EIP 的参考实现。
值得庆幸的是,SI 已经支持通过集成流作为网关将集成流公开为函数考虑以下示例:
@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
}
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlow.from(MessageFunction.class, spec -> spec.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.log(LoggingHandler.Level.WARN)
.bridge()
.get();
}
public interface MessageFunction extends Function<Message<String>, Message<String>> {
}
}
对于熟悉 SI 的人,您可以看到我们定义了一个类型为IntegrationFlow
我们在哪里
将我们要公开为的集成流声明为Function<String, String>
(使用 SI DSL)调用uppercase
.
这MessageFunction
接口允许我们显式声明输入和输出的类型,以便进行适当的类型转换。
有关类型转换的更多信息,请参阅内容类型协商部分。
要接收原始输入,您可以使用from(Function.class, …)
.
生成的函数绑定到目标绑定器公开的输入和输出目标。
请参考【绑定和绑定名称】部分了解命名 用于建立此类应用程序使用的绑定名称的约定。 |
有关 Spring Integration 和 Spring Cloud Stream 互作性的更多详细信息,特别是围绕函数式编程模型 您可能会发现这篇文章非常有趣,因为它深入探讨了 通过合并 Spring Integration 和 Spring Cloud Stream/Functions 的优点来应用各种模式。
使用轮询的消费者
概述
使用轮询的消费者时,轮询PollableMessageSource
按需。
要为轮询的消费者定义绑定,您需要提供spring.cloud.stream.pollable-source
财产。
请考虑以下轮询使用者绑定的示例:
--spring.cloud.stream.pollable-source=myDestination
可轮询源名称myDestination
在前面的示例中,将导致myDestination-in-0
保留的绑定名称
与函数式编程模型一致。
给定前面示例中轮询的使用者,您可以按如下方式使用它:
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition()) {
try {
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}
一种不那么手动且更像 Spring 的替代方法是配置一个计划任务 bean。例如
@Scheduled(fixedDelay = 5_000)
public void poll() {
System.out.println("Polling...");
this.source.poll(m -> {
System.out.println(m.getPayload());
}, new ParameterizedTypeReference<Foo>() { });
}
这PollableMessageSource.poll()
方法采用MessageHandler
参数(通常是 lambda 表达式,如下所示)。
它返回true
如果收到并成功处理了消息。
与消息驱动的消费者一样,如果MessageHandler
抛出异常,消息发布到错误通道,
如Error Handling
.
通常,poll()
方法在MessageHandler
出口。
如果方法异常退出,则消息将被拒绝(不重新排队),但请参阅处理错误。
可以通过对确认负责来覆盖该行为,如以下示例所示:
@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
return args -> {
while (someCondition()) {
if (!dest1In.poll(m -> {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
// e.g. hand off to another thread which can perform the ack
// or acknowledge(Status.REQUEUE)
})) {
Thread.sleep(1000);
}
}
};
}
您必须ack (或nack ) 消息,以避免资源泄漏。 |
某些消息传递系统(例如 Apache Kafka)在日志中维护一个简单的偏移量。如果投放失败并重新排队,则StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); ,则重新传递任何后来成功确认的邮件。 |
还有一个过载的poll
方法,其定义如下:
poll(MessageHandler handler, ParameterizedTypeReference<?> type)
这type
是一个转换提示,允许转换传入消息有效负载,如以下示例所示:
boolean result = pollableSource.poll(received -> {
Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
...
}, new ParameterizedTypeReference<Map<String, Foo>>() {});
处理错误
默认情况下,为可轮询源配置错误通道;如果回调抛出异常,则ErrorMessage
被发送到错误通道 (<destination>.<group>.errors
);此错误通道也桥接到全局 Spring IntegrationerrorChannel
.
您可以使用@ServiceActivator
处理错误;如果没有订阅,则错误将被简单地记录下来,并且消息将被确认为成功。
如果错误通道服务激活器引发异常,则该消息将被拒绝(默认),并且不会重新传递。
如果服务激活器抛出RequeueCurrentMessageException
,消息将在代理处重新排队,并在后续轮询中再次检索。
如果监听器抛出RequeueCurrentMessageException
如上所述,消息将直接重新排队,并且不会发送到错误通道。