此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

事件路由

在 Spring Cloud Stream 的上下文中,事件路由是 a) 将事件路由到特定事件订阅者b) 将事件订阅者生成的事件路由到特定目的地的能力。 在这里,我们将其称为路由“TO”和路由“FROM”。spring-doc.cadn.net.cn

路由到消费者

路由可以依靠RoutingFunction在 Spring Cloud Function 3.0 中可用。您需要做的就是通过--spring.cloud.stream.function.routing.enabled=true应用程序属性或提供spring.cloud.function.routing-expression财产。 启用后RoutingFunction将绑定到输入目的地 接收所有消息并根据提供的指令将它们路由到其他功能。spring-doc.cadn.net.cn

为了绑定,路由目标的名称是functionRouter-in-0(请参阅RoutingFunction.FUNCTION_NAME和绑定命名约定 [功能绑定名称])。

可以使用单个消息以及应用程序属性提供指令。spring-doc.cadn.net.cn

下面是几个示例:spring-doc.cadn.net.cn

使用邮件头

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

通过向functionRouter-in-0绑定器公开的目的地(即 rabbit、kafka), 此类消息将路由到适当的(“偶数”或“奇数”)使用者。spring-doc.cadn.net.cn

默认情况下RoutingFunction会寻找一个spring.cloud.function.definitionspring.cloud.function.routing-expression(对于使用 SpEL 的更动态场景) 标头,如果找到,其值将被视为路由指令。spring-doc.cadn.net.cn

例如 设置spring.cloud.function.routing-expressionheader 到值T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'最终将半随机地将请求路由到任一oddeven功能。 此外,对于 SpEL,求值上下文的根对象Message因此,您也可以对单个标头(或消息)进行评估…​.routing-expression=headers['type']spring-doc.cadn.net.cn

使用应用程序属性

spring.cloud.function.routing-expression和/或spring.cloud.function.definition可以作为应用程序属性传递(例如spring.cloud.function.routing-expression=headers['type'].spring-doc.cadn.net.cn

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}
通过应用程序属性传递指令对于响应式函数尤为重要,因为响应式函数 函数仅调用一次以传递发布者,因此对单个项的访问受到限制。

路由功能和输出绑定

RoutingFunction是一个Function因此,对待方式与任何其他职能没有什么不同。井。。。几乎。spring-doc.cadn.net.cn

什么时候RoutingFunction到另一个路由Function,其输出被发送到RoutingFunction哪 是functionRouter-in-0不出所料。但如果RoutingFunction路由到Consumer?换句话说,调用的结果 的RoutingFunction可能不会产生任何要发送到输出绑定的内容,因此甚至有必要有一个。 所以,我们确实对待RoutingFunction当我们创建绑定时,情况略有不同。即使它对您作为用户是透明的 (你真的没什么可做的),了解一些机制将有助于你理解它的内部运作方式。spring-doc.cadn.net.cn

所以,规则是; 我们从不为RoutingFunction,仅输入。因此,当您路由到ConsumerRoutingFunction有效 变成Consumer通过没有任何输出绑定。但是,如果RoutingFunction碰巧路由到另一个Function产生 输出,输出绑定RoutingFunction将动态创建,此时RoutingFunction将作为常规Function关于绑定(同时具有输入和输出绑定)。spring-doc.cadn.net.cn

路由 FROM 消费者

除了静态目标之外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目标。 例如,当需要在运行时确定目标目标时,这很有用。 应用程序可以通过以下两种方式之一执行此作。spring-doc.cadn.net.cn

spring.cloud.stream.sendto.destination

您还可以委托给框架,通过指定spring.cloud.stream.sendto.destination页眉 设置为要解析的目标的名称。spring-doc.cadn.net.cn

请考虑以下示例:spring-doc.cadn.net.cn

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

尽管在这个例子中你可以清楚地看到微不足道,但我们的输出是一个带有spring.cloud.stream.sendto.destination页眉 设置为 he 输入参数的值。框架将查阅此标头,并尝试创建或发现 具有该名称的目标,并向其发送输出。spring-doc.cadn.net.cn

如果预先知道目标名称,则可以像配置任何其他目标一样配置生产者属性。 或者,如果您注册了NewDestinationBindingCallback<>bean,则在创建绑定之前调用它。 回调采用绑定器使用的扩展生产者属性的泛型类型。 它有一个方法:spring-doc.cadn.net.cn

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例显示如何使用 RabbitMQ 绑定器:spring-doc.cadn.net.cn

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
如果您需要支持具有多种活页夹类型的动态目标,请使用Object对于泛型类型,并将extended参数。

另外,请参阅[使用StreamBridge]部分,了解如何在类似情况下使用另一个选项(StreamBridge)。spring-doc.cadn.net.cn