对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
绑定属性
绑定属性通过使用spring.cloud.stream.bindings.<bindingName>.<property>=<value>
.
这<bindingName>
表示正在配置的绑定的名称。
例如,对于以下函数
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
有两个名为uppercase-in-0
用于输入和uppercase-out-0
用于输出。有关详细信息,请参阅 [绑定和绑定名称]。
为了避免重复,Spring Cloud Stream 支持为所有绑定设置值,格式为spring.cloud.stream.default.<property>=<value> 和spring.cloud.stream.default.<producer|consumer>.<property>=<value> 用于常见的绑定属性。 |
当涉及到避免重复扩展绑定属性时,应使用以下格式 -spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>
.
常见绑定属性
这些属性通过org.springframework.cloud.stream.config.BindingProperties
以下绑定属性可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.
(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock
).
可以使用spring.cloud.stream.default
前缀(例如spring.cloud.stream.default.contentType=application/json
).
- 目的地
-
绑定中间件上绑定的目标目标(例如,RabbitMQ 交换或 Kafka 主题)。 如果绑定表示使用者绑定(输入),则可以将其绑定到多个目标,并且目标名称可以指定为逗号分隔
String
值。 如果没有,则改用实际的绑定名称。 无法覆盖此属性的默认值。 - 群
-
绑定的使用者组。 仅适用于入站绑定。 请参阅消费者组。
违约:
null
(表示匿名消费者)。 - 内容类型
-
此绑定的内容类型。 看
Content Type Negotiation
.违约:
application/json
. - 粘结 剂
-
此绑定使用的活页夹。 看
Multiple Binders on the Classpath
了解详情。违约:
null
(使用默认活页夹(如果存在)。
消费物业
这些属性通过org.springframework.cloud.stream.binder.ConsumerProperties
以下绑定属性仅适用于输入绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.consumer.
(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3
).
可以使用spring.cloud.stream.default.consumer
前缀(例如spring.cloud.stream.default.consumer.headerMode=none
).
- 自动启动
-
是否需要自动启动此使用者的信号
违约:
true
. - 并发
-
入站使用者的并发性。
违约:
1
. - 分区
-
使用者是否从分区生产者接收数据。
违约:
false
. - headerMode
-
当设置为
none
,禁用输入上的标头解析。 仅对本机不支持消息头且需要嵌入标头的消息传递中间件有效。 当不支持本机标头时,从非 Spring Cloud Stream 应用程序使用数据时,此选项非常有用。 当设置为headers
,它使用中间件的原生标头机制。 当设置为embeddedHeaders
,它将标头嵌入到消息有效负载中。默认值:取决于活页夹实现。
- 最大尝试次数
-
如果处理失败,则尝试处理消息的次数(包括第一次)。 设置为
1
以禁用重试。违约:
3
. - backOff初始间隔
-
重试时的回退初始间隔。
违约:
1000
. - backOffMax间隔
-
最大回退间隔。
违约:
10000
. - backOff乘数
-
退避乘数。
违约:
2.0
. - default可重试
-
监听器抛出的异常是否未在
retryableExceptions
是可重试的。违约:
true
. - 实例计数
-
当设置为大于等于零的值时,它允许自定义此消费者的实例计数(如果不同时
spring.cloud.stream.instanceCount
). 当设置为负值时,它默认为spring.cloud.stream.instanceCount
. 看Instance Index and Instance Count
了解更多信息。违约:
-1
. - 实例索引
-
当设置为大于等于零的值时,它允许自定义此消费者的实例索引(如果与
spring.cloud.stream.instanceIndex
). 当设置为负值时,它默认为spring.cloud.stream.instanceIndex
. 如果出现以下情况,则忽略instanceIndexList
被提供。 看Instance Index and Instance Count
了解更多信息。违约:
-1
. - 实例索引列表
-
与不支持原生分区的绑定器(如RabbitMQ)一起使用;允许应用程序实例从多个分区使用。
默认值:空。
- retryableExceptions
-
键中的 Throwable 类名和值中的布尔值的映射。 指定那些将重试或不会重试的异常(和子类)。 另请参阅
defaultRetriable
. 例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
.默认值:空。
- 使用原生解码
-
当设置为
true
,则入站消息由客户端库直接反序列化,必须相应地配置(例如,设置适当的 Kafka 生产者值反序列化器)。 使用此配置时,入站消息解组不是基于contentType
的绑定。 使用本机解码时,生产者负责使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。 此外,当使用本机编码和解码时,headerMode=embeddedHeaders
属性被忽略,并且标头不会嵌入到消息中。 查看生产者属性useNativeEncoding
.违约:
false
. - 多重
-
设置为 true 时,基础绑定器将本机多路复用同一输入绑定上的目标。
违约:
false
.
高级消费者配置
要对消息驱动的使用者的底层消息侦听器容器进行高级配置,请添加单个ListenerContainerCustomizer
bean 到应用程序上下文。
它将在应用上述属性后调用,并可用于设置其他属性。
同样,对于轮询的消费者,将MessageSourceCustomizer
豆。
以下是 RabbitMQ 绑定器的示例:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}
@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}
生产者属性
这些属性通过org.springframework.cloud.stream.binder.ProducerProperties
以下绑定属性仅适用于输出绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.producer.
(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
).
可以使用前缀spring.cloud.stream.default.producer
(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id
).
- 自动启动
-
是否需要自动启动此使用者的信号
违约:
true
. - partitionKey表达式
-
确定如何对出站数据进行分区的 SpEL 表达式。 如果设置,则此绑定上的出站数据将被分区。
partitionCount
必须设置为大于 1 的值才能生效。 看Partitioning
.默认值:null。
- partitionKeyExtractorName (分区密钥提取器名称)
-
实现的 Bean 的名称
PartitionKeyExtractorStrategy
.用于提取用于计算的密钥 分区 ID(参见 'partitionSelector*')。与 'partitionKeyExpression' 互斥。默认值:null。
- partitionSelectorName (分区选择器名称)
-
实现的 Bean 的名称
PartitionSelectorStrategy
.用于确定基于分区 ID 的 在分区键上(参见 'partitionKeyExtractor*')。与 'partitionSelectorExpression' 互斥。默认值:null。
- partitionSelector表达式
-
用于自定义分区选择的 SpEL 表达式。 如果两者都未设置,则将分区选择为
hashCode(key) % partitionCount
哪里key
通过partitionKeyExpression
.违约:
null
. - 分区计数
-
数据的目标分区数(如果启用了分区)。 如果生产者已分区,则必须设置为大于 1 的值。 在卡夫卡上,它被解释为一种提示。改用此值和目标主题的分区计数中的较大者。
违约:
1
. - 必需组
-
以逗号分隔的组列表,生产者必须确保消息传递到这些组,即使它们在创建后启动(例如,通过在 RabbitMQ 中预先创建持久队列)。
- headerMode
-
当设置为
none
,它会禁用输出上的标头嵌入。 它仅对本机不支持消息标头且需要标头嵌入的消息传递中间件有效。 当不支持本机标头时,当为非 Spring Cloud Stream 应用程序生成数据时,此选项非常有用。 当设置为headers
,它使用中间件的原生标头机制。 当设置为embeddedHeaders
,它将标头嵌入到消息有效负载中。默认值:取决于活页夹实现。
- 使用原生编码
-
当设置为
true
,则出站消息由客户端库直接序列化,必须相应地配置(例如,设置适当的 Kafka 生产者值序列化器)。 使用此配置时,出站消息封送不是基于contentType
的绑定。 使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka 使用者值反序列化程序)来反序列化入站消息。 此外,当使用本机编码和解码时,headerMode=embeddedHeaders
属性被忽略,并且标头不会嵌入到消息中。 查看消费者属性useNativeDecoding
.违约:
false
. - 错误通道启用
-
当设置为 true 时,如果 Binder 支持异步发送结果,则发送失败将发送到目标的错误通道。有关详细信息,请参阅错误处理。
默认值:false。
高级生产者配置
在某些情况下,生产者属性不足以在绑定器中正确配置生成的 MessageHandler,或者您可能更喜欢编程方法
同时配置此类生成的 MessageHandler。不管是什么原因,spring-cloud-stream 都提供了ProducerMessageHandlerCustomizer
来完成它。
@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Configure a {@link MessageHandler} that is being created by the binder for the
* provided destination name.
* @param handler the {@link MessageHandler} from the binder.
* @param destinationName the bound destination name.
*/
void configure(H handler, String destinationName);
}
如您所见,它使您可以访问生产的实际实例MessageHandler
您可以根据需要进行配置。
您需要做的就是提供此策略的实现并将其配置为@Bean
.