|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
事件路由
在Spring Cloud Stream的背景下,事件路由是指既可以 a) 将事件路由到特定的事件订阅者 或者 b) 将事件订阅者产生的事件路由到特定的目的地。 在这里,我们将这种能力称为路由到(route TO)和路由来自(route FROM)。
路由到消费者
可以通过依赖 Spring Cloud Function 3.0 中提供的 RoutingFunction 实现路由。您所需要做的就是通过--spring.cloud.stream.function.routing.enabled=true应用属性启用它,或者提供 spring.cloud.function.routing-expression 属性。
一旦启用,RoutingFunction 将绑定到输入目标并接收所有消息,并根据所提供的指令将它们路由到其他函数。
为了绑定路由目标名称的目的,该名称为functionRouter-in-0(参见RoutingFunction.FUNCTION_NAME和绑定命名约定函数绑定名称)。 |
说明可以与各个消息以及应用程序属性一起提供。
这里有几个示例:
使用消息头
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class,
"--spring.cloud.stream.function.routing.enabled=true");
}
@Bean
public Consumer<String> even() {
return value -> {
System.out.println("EVEN: " + value);
};
}
@Bean
public Consumer<String> odd() {
return value -> {
System.out.println("ODD: " + value);
};
}
}
通过向绑定器(即 Rabbit、Kafka)公开的 functionRouter-in-0 目标发送消息,
该消息将被路由到适当的消费者(“偶数”或“奇数”)。
默认情况下,RoutingFunction 将查找一个 spring.cloud.function.definition 或 spring.cloud.function.routing-expression(用于更动态的场景,带有 SpEL)
标题,如果找到,其值将被视为路由指令。
例如,将头部spring.cloud.function.routing-expression设置为值T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'会随机地将请求路由到odd或even函数之一。另外,对于SpEL,评估上下文的根对象是Message,因此你也可以在各个标题(或消息)上进行评估….routing-expression=headers['type']
使用应用程序属性
可以将 spring.cloud.function.routing-expression 和/或 spring.cloud.function.definition 作为应用程序属性传递(例如,spring.cloud.function.routing-expression=headers['type']。
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
| 通过应用属性传递指令对于响应式函数尤其重要,因为响应式函数仅被调用一次以传递Publisher,所以对各个项目的访问是有限的。 |
路由功能和输出绑定
RoutingFunction是一个Function,因此被视为与任何其他函数没有不同。嗯……差不多。
当RoutingFunction路由到另一个Function时,它的输出被发送到RoutingFunction的输出绑定,这与预期一致。
但是,如果RoutingFunction路由到一个Consumer呢?换句话说,调用RoutingFunction的结果可能不会产生任何要发送到输出绑定的内容,因此甚至需要有一个。
所以,在创建绑定时,我们对RoutingFunction做了一些不同的处理。虽然作为用户你对此是透明的(实际上没有什么好做的),但了解一些内部机制会帮助你理解其工作原理。
所以,规则是;
我们从不为 RoutingFunction 创建输出绑定,只创建输入。因此当你路由到 Consumer 时,RoutingFunction 实际上
没有输出绑定的情况下变成了 Consumer。但是,如果 RoutingFunction 恰好路由到另一个产生
输出的 Function,那么在这一点上将动态地创建 RoutingFunction 的输出绑定,此时 RoutingFunction 将作为具有和
消费者路由
除了静态目标外,Spring Cloud Stream 还允许应用程序向动态绑定的目标发送消息。例如,在需要在运行时确定目标目的地的情况下,这很有用。应用程序可以通过两种方式之一来实现。
spring.cloud.stream.sendto.destination
您还可以将输出目标的动态解析委托给框架,方法是将spring.cloud.stream.sendto.destination标题设置为要解析的目标名称。
考虑以下示例:
@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {
@Bean
public Function<String, Message<String>> destinationAsPayload() {
return value -> {
return MessageBuilder.withPayload(value)
.setHeader("spring.cloud.stream.sendto.destination", value).build();};
}
}
尽管这个例子很简单,但我们可以清楚地看到,在这个示例中,我们的输出是一个Message,并且spring.cloud.stream.sendto.destination头部被设置为输入参数的值。框架会查询此头部并尝试创建或查找具有该名称的目标并将输出发送到其中。
如果事先知道目标名称,可以像配置任何其他目的地一样配置生产者属性。或者,如果您注册了NewDestinationBindingCallback<> Bean,则在创建绑定之前会调用它。回调采用Binder使用的扩展生产者属性的通用类型。它有一个方法:
void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
下面的例子展示了如何使用RabbitMQ绑定器:
@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
如果您需要支持具有多种绑定器类型的动态目标,请对泛型类型使用Object,并根据需要将extended参数进行强制转换。 |
此外,请参阅使用StreamBridge部分,了解如何利用另一种选项来处理类似情况。