此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
后期处理(发送消息后)
调用函数后,其结果将由框架发送到目标目的地,从而有效地完成函数调用周期。
但是,从业务角度来看,这样的周期可能要等到完成该周期后执行一些额外的任务才能完全完成。
虽然这可以通过简单的组合来实现Consumer
和StreamBridge
正如这篇 Stack Overflow 帖子中所述,从 4.0.3 版本开始,框架
提供了一种更惯用的方法来解决这个问题,方法是PostProcessingFunction
由 Spring Cloud Function 项目提供。
这PostProcessingFunction
是一个特殊的半标记函数,其中包含一个附加方法postProcess(Message>)
设计
为实现此类后处理任务提供场所。
package org.springframework.cloud.function.context . . . public interface PostProcessingFunction<I, O> extends Function<I, O> { default void postProcess(Message<O> result) { } }
所以,现在你有两个选择。
选项 1:您可以将函数实现为PostProcessingFunction
并通过实现其postProcess(Message>)
方法。
private static class Uppercase implements PostProcessingFunction<String, String> { @Override public String apply(String input) { return input.toUpperCase(); } @Override public void postProcess(Message<String> result) { System.out.println("Function Uppercase has been successfully invoked and its result successfully sent to target destination"); } } . . . @Bean public Function<String, String> uppercase() { return new Uppercase(); }
选项 2:如果您已经有一个现有函数并且不想更改其实现或希望将函数保留为 POJO,您可以简单地仅实现postProcess(Message>)
方法,并将这个新的后处理函数与其他函数组合在一起。
private static class Logger implements PostProcessingFunction<?, String> { @Override public void postProcess(Message<String> result) { System.out.println("Function has been successfully invoked and its result successfully sent to target destination"); } } . . . @Bean public Function<String, String> uppercase() { return v -> v.toUpperCase(); } @Bean public Function<String, String> logger() { return new Logger(); } . . . // and then have your function definition as such `uppercase|logger`
注意:
在函数组合的情况下,只有PostProcessingFunction
(如果存在)将生效。例如,假设您有
以下函数定义 -foo|bar|baz
以及两者foo
和baz
是PostProcessingFunction
.只baz.postProcess(Message>)
将被调用。
如果baz
不是PostProcessingFunction
,则不会执行后处理功能。
有人可能会争辩说,您可以通过简单地将后处理器组合为另一个函数组合来轻松做到这一点Function
.然而,这确实是一种可能性
在这种情况下,后处理功能将在调用上一个函数之后和消息发送到目标目标之前立即调用
这是在函数调用周期完成之前。