|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
绑定属性
绑定属性通过使用格式为 spring.cloud.stream.bindings.<bindingName>.<property>=<value> 的方式提供。
<bindingName> 表示正在配置的绑定的名称。
例如,对于以下功能
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
这两个绑定分别命名为uppercase-in-0,用于输入,以及uppercase-out-0,用于输出。有关更多信息,请参阅[绑定和绑定名称]。
To avoid repetition, Spring Cloud Stream 支持为所有绑定设置值,格式为 spring.cloud.stream.default.<property>=<value>
和 spring.cloud.stream.default.<producer|consumer>.<property>=<value> 用于常见的绑定属性。 |
当涉及到为长时间绑定属性避免重复时,应使用这种格式 - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>。
通用绑定属性
这些属性通过org.springframework.cloud.stream.config.BindingProperties公开
以下为输入和输出绑定都可用的绑定属性,必须以 spring.cloud.stream.bindings.<bindingName>. 前缀(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock)。
默认值可以通过使用spring.cloud.stream.default前缀(例如spring.cloud.stream.default.contentType=application/json)来设置。
- 目的地
-
绑定到已绑定中间件的目标目的地(例如,RabbitMQ交换机或Kafka主题)。
如果绑定表示使用者绑定(输入),则它可以绑定到多个目标,并且可以指定目标名称作为以逗号分隔的
String值。否则,将使用实际绑定名称。
此属性的默认值无法被覆盖。
- group
-
绑定的消费者组。 仅适用于入站绑定。 见消费者组。
默认:
null(表示匿名消费者)。 - 内容类型
-
此绑定的内容类型。请参阅
Content Type Negotiation。默认值:
application/json。 - 绑定器
-
用于此绑定的绑定程序。 请参阅
Multiple Binders on the Classpath获取详细信息。Default:
null(the default binder is used, if it exists).
消费者属性
这些属性通过org.springframework.cloud.stream.binder.ConsumerProperties公开
以下绑定属性仅适用于输入绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.consumer.为前缀(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3)。
默认值可以通过使用spring.cloud.stream.default.consumer前缀(例如,spring.cloud.stream.default.consumer.headerMode=none)设置。
- 自动启动
-
触发此消费者自动启动信号
默认值:
true。 - 并发
-
inbound consumer 的并发量。
默认值:
1。 - 分区化的
-
消费者是否从分区生产者接收数据。
默认值:
false。 - 页面模式
-
设置为
none时,禁用输入上的标题解析。仅对不原生支持消息标题且需要嵌入标题的消息中间件有效。当从非Spring Cloud Stream应用程序消费数据且不支持原生标题时,此选项很有用。设置为headers时,它使用中间件的原生标题机制。设置为embeddedHeaders时,它将标题嵌入到消息负载中。默认值取决于绑定器实现。
- maxAttempts
-
如果处理失败,重试处理消息的次数(包括第一次)。设置为
1以禁用重试。默认值:
3。 - 重试初始间隔
-
重试时的退避初始间隔。
默认值:
1000。 - 重试最大间隔
-
最大退避间隔。
默认值:
10000。 - 退避乘数
-
退避乘数。
默认值:
2.0。 - 默认可重试
-
监听器抛出且未列在
retryableExceptions中的异常是否可重试。默认值:
true。 - 实例数量
-
设置为大于等于零的值时,可以自定义此消费者的实例数量(如果不同于
spring.cloud.stream.instanceCount)。
当设置为负值时,默认为spring.cloud.stream.instanceCount。
有关更多信息,请参见Instance Index and Instance Count。默认值:
-1。 - 实例索引
-
当设置为大于等于零的值时,它允许自定义此消费者的实例索引(如果不同于
spring.cloud.stream.instanceIndex)。当设置为负值时,默认为spring.cloud.stream.instanceIndex。如果提供了instanceIndexList,则会被忽略。Instance Index and Instance Count了解更多详细信息。默认值:
-1。 - 实例索引列表
-
与不支持原生分区的绑定器一起使用(例如RabbitMQ);允许应用程序实例从多个分区中消费。
默认:空。
- 可重试异常
-
一个以 Throwable 类名作为键,布尔值作为值的映射。
指定那些将要或不会重试的异常(以及子类)。
另请参阅defaultRetryable。
示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false。默认:空。
- 使用原生解码
-
当设置为
true时,入站消息将由客户端库直接反序列化,必须相应地配置客户端库(例如,设置适当的 Kafka 生产者值反序列化器)。
当使用此配置时,入站消息的反序列化不基于绑定的contentType。
当使用原生解码时,生产者的责任是使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。
另外,当使用原生编码和解码时,headerMode=embeddedHeaders属性被忽略且标题未嵌入到消息中。
参见生产者属性useNativeEncoding。默认值:
false。 - 多路复用
-
设置为true时,底层绑定器将在同一输入绑定上原生多路复用目标。
默认值:
false。
高级消费者配置
对于消息驱动消费者的基础消息监听容器的高级配置,请向应用上下文中添加单个ListenerContainerCustomizerbean。在上述属性被应用后,它将被调用,并可用于设置其他属性。同样地,对于轮询消费者,请添加一个MessageSourceCustomizerbean。
以下是RabbitMQ绑定器的示例:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}
@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}
生产者属性
这些属性通过org.springframework.cloud.stream.binder.ProducerProperties公开
以下绑定属性仅适用于输出绑定,并且必须以前缀spring.cloud.stream.bindings.<bindingName>.producer.(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id)。
可以通过使用前缀 spring.cloud.stream.default.producer 来设置默认值(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id)。
- 自动启动
-
触发此消费者自动启动信号
默认值:
true。 - 分区键表达式
-
一个SpEL表达式,用于确定如何划分传出数据。
如果设置,则对此绑定上的传出数据进行分区。partitionCount必须设置为大于1的值才能生效。
请参阅Partitioning。默认值:null。
- 分区键提取器名称
-
实现
PartitionKeyExtractorStrategy的 bean 的名称。用于提取用作计算分区 ID 的键(参见“partitionSelector*”)。与“partitionKeyExpression”互斥。默认值:null。
- 分区选择器名称
-
实现
PartitionSelectorStrategy的 Bean 名称。用于根据分区键确定分区 ID(参见 'partitionKeyExtractor*')。与 'partitionSelectorExpression' 相互排斥。默认值:null。
- 分区选择器表达式
-
用于自定义分区选择的 SpEL 表达式。如果两者均未设置,则分区将选择为
hashCode(key) % partitionCount,其中key是通过partitionKeyExpression计算得出的。默认值:
null。 - 分区数量
-
启用分区时,数据的目标分区数。如果生产者已分区,则必须设置为大于1的值。在Kafka中,这被视为一个提示。此值与目标主题的分区数中的较大值将被使用。
默认值:
1。 - 必需的组
-
一个逗号分隔的组列表,生产者必须确保即使它们在创建后启动,消息交付也会得到保障(例如,在RabbitMQ中预先创建持久队列)。
- 页面模式
-
设置为
none时,它会在输出上禁用头嵌入。仅当消息中间件不支持原生消息头且需要头嵌入时才有效。当生产数据用于非Spring Cloud Stream应用程序且不支持原生头时,此选项很有用。
设置为headers时,它使用中间件的原生头机制。
设置为embeddedHeaders时,它将头嵌入到消息负载中。默认值取决于绑定器实现。
- 使用原生编码
-
当设置为
true时,出站消息由客户端库直接序列化,必须相应地进行配置(例如,设置适当的 Kafka 生产者值序列化程序)。 使用此配置时,出站消息的封送不是基于绑定的contentType。 使用原生编码时,消费者有责任使用适当的解码器(例如,Kafka 消费者值反序列化程序)来反序列化入站消息。 另外,当使用原生编码和解码时,headerMode=embeddedHeaders属性被忽略且不将标头嵌入消息中。 参见消费者属性useNativeDecoding。默认值:
false。 - 错误通道启用
-
设置为true时,如果绑定器支持异步发送结果,则会将发送失败的消息发送到目标的错误通道。更多信息请参见错误处理。
默认值:false。
高级生产者配置
在某些情况下,仅使用生产者属性不足以正确配置绑定器中的消息处理器(MessageHandler),或者您可能更倾向于采用编程方式来配置此类生产消息处理器。无论出于何种原因,Spring Cloud Stream 提供了 ProducerMessageHandlerCustomizer 来实现这一目标。
@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Configure a {@link MessageHandler} that is being created by the binder for the
* provided destination name.
* @param handler the {@link MessageHandler} from the binder.
* @param destinationName the bound destination name.
*/
void configure(H handler, String destinationName);
}
正如您所见,它为您提供对实际生产实例MessageHandler的访问权限,您可以根据需要进行配置。
您所需要做的就是提供此策略的实现,并将其配置为@Bean。