对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
配置选项
本节包含 Apache Kafka 绑定器使用的配置选项。
有关与活页夹相关的常见配置选项和属性,请参阅核心文档中的绑定属性。
Kafka Binder 属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka 绑定器连接到的代理列表。
违约:
localhost
. - spring.cloud.stream.kafka.binder.defaultBrokerPort 的
-
brokers
允许指定带或不带端口信息的主机(例如,host1,host2:port2
). 当代理列表中未配置端口时,这将设置默认端口。违约:
9092
. - spring.cloud.stream.kafka.binder.configuration
-
客户端属性(生产者和消费者)的键/值映射传递给绑定器创建的所有客户端。由于这些属性都由生产者和消费者使用,因此使用应仅限于公共属性——例如,安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,并且不允许传播。此处的属性取代启动中设置的任何属性。
默认值:空地图。
- spring.cloud.stream.kafka.binder.consumer属性
-
任意 Kafka 客户端使用者属性的键/值映射。除了支持已知的 Kafka 使用者属性外,这里还允许未知使用者属性。此处的属性取代在引导和
configuration
属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.headers
-
由活页夹传输的自定义标头列表。仅当与较旧的应用程序(⇐ 1.3.x)通信时才需要
kafka-clients
版本 < 0.11.0.0。较新的版本原生支持标头。默认值:空。
- spring.cloud.stream.kafka.binder.health超时
-
等待获取分区信息的时间(以秒为单位)。如果此计时器过期,运行状况将报告为关闭。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的确认数。请参阅生产者的 Kafka 文档
acks
财产。违约:
1
. - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在以下情况下有效
autoCreateTopics
或autoAddPartitions
已设置。绑定器在其生成或使用数据的主题上配置的全局最小分区数。它可以被partitionCount
的设置或通过instanceCount * concurrency
生产者的设置(如果其中任何一个较大)。违约:
1
. - spring.cloud.stream.kafka.binder.producer属性
-
任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,这里还允许使用未知的生产者属性。 此处的属性将取代在启动和
configuration
属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.replicationFactor
-
自动创建主题的复制因子
autoCreateTopics
处于活动状态。 可以在每个绑定上覆盖。如果您使用的是 2.4 之前的 Kafka 代理版本,则此值应至少设置为 1
. 从 3.0.8 版开始,绑定器使用-1
作为默认值,这表示代理“default.replication.factor”属性将用于确定副本数。 请咨询您的 Kafka 代理管理员,看看是否有需要最小复制因子的策略,如果是这种情况,则通常会default.replication.factor
将匹配该值,并且-1
应使用,除非您需要大于最小值的复制系数。违约:
-1
. - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true
,活页夹会自动创建新主题。 如果设置为false
,则活页夹依赖于已配置的主题。 在后一种情况下,如果主题不存在,则活页夹无法启动。此设置独立于 auto.create.topics.enable
经纪人的设置,不会影响它。 如果服务器设置为自动创建主题,则可以使用默认代理设置将其作为元数据检索请求的一部分创建。违约:
true
. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true
,如果需要,活页夹会创建新分区。 如果设置为false
,则 binder 依赖于已配置的主题的分区大小。 如果目标主题的分区计数小于预期值,则绑定程序无法启动。违约:
false
. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在活页夹中启用事务。看
transaction.id
在 Kafka 文档中,在spring-kafka
文档。 启用交易后,单个producer
属性被忽略,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*
性能。默认值
null
(无交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务性活页夹中生产者的全局生产者属性。 看
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和 Kafka 生产者属性以及所有绑定器支持的通用生产者属性。默认值:查看各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
的 bean 名称
KafkaHeaderMapper
用于映射spring-messaging
headers 与 Kafka 标头之间的标头。例如,如果您希望自定义BinderHeaderMapper
对标头使用 JSON 反序列化的 bean。如果这个自定义BinderHeaderMapper
使用此属性的 Bean 不可供 Binder 使用,则 Binder 将查找名称为kafkaBinderHeaderMapper
即BinderHeaderMapper
在回退到默认值之前BinderHeaderMapper
由活页夹创建。默认值:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
将绑定器运行状况设置为
down
,当找到主题上的任何分区时,无论从该分区接收数据的使用者如何,都找不到没有领导者。违约:
true
. - spring.cloud.stream.kafka.binder.certificateStore目录
-
当信任库或密钥库证书位置作为非本地文件系统资源(org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等)给出时, Binder 将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。 对于代理级证书 (
ssl.truststore.location
和ssl.keystore.location
) 和用于架构注册表的证书 (schema.registry.ssl.truststore.location
和schema.registry.ssl.keystore.location
). 请记住,信任库和密钥库位置路径必须在spring.cloud.stream.kafka.binder.configuration…
. 例如spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
,spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
等。 该文件将被复制到指定为此属性值的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。如果未设置此值,并且证书文件是非本地文件系统资源,则它将复制到系统的临时目录,如System.getProperty("java.io.tmpdir")
. 如果存在此值,但在文件系统上找不到该目录或不可写,则也是如此。默认值:无。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetrics启用
-
当设置为 true 时,每当访问指标时,都会计算每个使用者主题的偏移滞后指标。当设置为 false 时,仅使用定期计算的偏移滞后。
默认值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
计算每个使用者主题的偏移滞后的时间间隔。每当
metrics.defaultOffsetLagMetricsEnabled
被禁用或其计算时间过长。默认值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此活页夹中的所有绑定上启用千分尺观察注册表。
默认值:false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicator
元数据使用者group.id
. 此消费者由HealthIndicator
查询有关正在使用的主题的元数据。默认值:无。
Kafka 消费者属性
以下属性仅适用于 Kafka 使用者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.
.
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.consumer.<property>=<value> . |
- admin.configuration
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties
,并且将在将来的版本中删除对它的支持。 - admin.replicas-assignment
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment
,并且将在将来的版本中删除对它的支持。 - admin.replication-factor
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor
,并且将在将来的版本中删除对它的支持。 - autoRebalance启用
-
什么时候
true
,主题分区将在消费者组的成员之间自动重新平衡。 什么时候false
,每个消费者都会根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
. 这需要同时spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
在每个启动的实例上适当设置的属性。 的值spring.cloud.stream.instanceCount
在这种情况下,属性通常必须大于 1。违约:
true
. - ackEach记录
-
什么时候
autoCommitOffset
是true
,则此设置规定是否在处理每条记录后提交偏移量。默认情况下,偏移量将在consumer.poll()
已处理。轮询返回的记录数可以通过max.poll.records
Kafka 属性,通过使用者设置configuration
财产。 将此设置为true
可能会导致性能下降,但这样做会降低发生故障时重新传递记录的可能性。另请参阅活页夹requiredAcks
属性,这也会影响提交偏移量的性能。从 3.1 开始,此属性已被弃用,转而使用ackMode
. 如果ackMode
未设置且未启用批处理模式,RECORD
ackMode 将被使用。违约:
false
. - 自动提交偏移
-
从版本 3.1 开始,此属性已弃用。 看
ackMode
有关备选方案的更多详细信息。是否在处理消息时自动提交偏移量。如果设置为false
,带有键kafka_acknowledgment
的类型org.springframework.kafka.support.Acknowledgment
标头存在于入站消息中。应用程序可以使用此标头来确认消息。有关详细信息,请参阅示例部分。当此属性设置为false
,Kafka 绑定器将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
应用程序负责确认记录。另请参阅ackEachRecord
.违约:
true
. - ack模式
-
指定容器确认模式。 这是基于 Spring Kafka 中定义的 AckMode 枚举。 如果
ackEachRecord
属性设置为true
并且消费者不处于批处理模式,则这将使用RECORD
,否则,请使用此属性使用提供的 ack 模式。 - 自动提交错误
-
在可轮询的消费者中,如果设置为
true
,它总是在错误时自动提交。 如果未设置(默认值)或 false,则不会在可轮询的消费者中自动提交。 请注意,此属性仅适用于可轮询的使用者。默认值:未设置。
- 重置偏移量
-
是否将使用者上的偏移量重置为 startOffset 提供的值。 如果
KafkaBindingRebalanceListener
提供;请参阅 rebalance 侦听器 有关此属性的更多信息,请参阅 reset-offsets。违约:
false
. - 开始偏移量
-
新组的起始偏移量。 允许的值:
earliest
和latest
. 如果为消费者“绑定”显式设置了使用者组(通过spring.cloud.stream.bindings.<channelName>.group
),'startOffset' 设置为earliest
.否则,它设置为latest
对于anonymous
消费者群体。 有关此属性的更多信息,请参阅 reset-offsets。默认值:null(相当于
earliest
). - 启用Dlq
-
当设置为 true 时,它会为使用者启用 DLQ 行为。 默认情况下,导致错误的邮件将转发到名为
error.<destination>.<group>
. 可以通过设置dlqName
属性或通过定义@Bean
类型DlqDestinationResolver
. 这为更常见的 Kafka 重放场景提供了另一种选择,适用于错误数量相对较少且重放整个原始主题可能过于繁琐的情况。 有关更多信息,请参阅 kafka dlq 处理。 从 V2.0 开始,发送到 DLQ 主题的消息将使用以下标头进行增强:x-original-topic
,x-exception-message
和x-exception-stacktrace
如byte[]
. 缺省情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。 有关如何更改该行为,请参阅 dlq 分区选择。不允许在以下情况下使用destinationIsPattern
是true
.违约:
false
. - dlq分区
-
什么时候
enableDlq
为 true,并且未设置此属性,则创建与主主题具有相同分区数的死信主题。 通常,死信记录将发送到死信主题中与原始记录相同的分区。 此行为可以更改;请参阅 DLQ 分区选择。 如果此属性设置为1
并且没有DqlPartitionFunction
bean,所有死信记录都将写入分区0
. 如果此属性大于1
,您必须提供一个DlqPartitionFunction
豆。 请注意,实际分区计数受活页夹的minPartitionCount
财产。违约:
none
- 配置
-
映射包含通用 Kafka 使用者属性的键/值对。 除了具有 Kafka 消费者属性外,还可以在此处传递其他配置属性。 例如,应用程序所需的某些属性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
. 这bootstrap.servers
属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空地图。
- dlq名称
-
要接收错误消息的 DLQ 主题的名称。
默认值:null(如果未指定,则导致错误的消息将转发到名为
error.<destination>.<group>
). - dlqProducer属性
-
使用此功能,可以设置特定于 DLQ 的生产者属性。 通过 kafka 生产者属性提供的所有属性都可以通过此属性进行设置。 当在消费者上启用本机解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以以下形式提供
dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
.默认值:默认 Kafka 生产者属性。
- 标准标头
-
指示入站通道适配器填充的标准标头。 允许的值:
none
,id
,timestamp
或both
. 如果使用本机反序列化并且接收消息的第一个组件需要id
(例如配置为使用 JDBC 消息存储的聚合器)。违约:
none
- 转换器BeanName
-
实现
RecordMessageConverter
.用于入站通道适配器以替换默认的MessagingMessageConverter
.违约:
null
- 空闲事件间隔
-
指示最近未收到任何消息的事件之间的间隔(以毫秒为单位)。 使用
ApplicationListener<ListenerContainerIdleEvent>
以接收这些事件。 有关使用示例,请参阅 pause-resume。违约:
30000
- 目的地是模式
-
当 true 时,目标被视为正则表达式
Pattern
用于匹配代理的主题名称。 如果为 true,则不会预配主题,并且enableDlq
不允许,因为绑定程序在配置阶段不知道主题名称。 请注意,检测与模式匹配的新主题所花费的时间由消费者属性控制metadata.max.age.ms
,(在撰写本文时)默认为 300,000 毫秒(5 分钟)。 这可以使用configuration
属性。违约:
false
- topic.properties
-
一个
Map
配置新主题时使用的 Kafka 主题属性——例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。 在预配新主题时使用。 请参阅
NewTopic
Javadocs 中的kafka-clients
罐。默认值:无。
- topic.replication-factor
-
预配主题时要使用的复制因子。覆盖活页夹范围的设置。 如果出现以下情况,则忽略
replicas-assignments
存在。默认值:无(使用活页夹范围的默认值 -1)。
- pollTimeout
-
超时,用于在可轮询消费者中轮询。
默认值:5 秒。
- 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager
用于覆盖此绑定的绑定程序的事务管理器。 如果您想将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager
. 要实现记录的一次使用和生产,使用者和生产者绑定必须全部配置为相同的事务管理器。默认值:无。
- txCommit已恢复
-
使用事务性绑定器时,默认情况下,恢复记录的偏移量(例如,当重试用尽并将记录发送到死信主题时)将通过新事务提交。 将此属性设置为
false
禁止提交已恢复记录的偏移量。默认值:true。
- commonErrorHandlerBeanName (常见错误处理程序)BeanName
-
CommonErrorHandler
每个消费者绑定使用的 bean 名称。 如果存在,此用户提供了CommonErrorHandler
优先于绑定程序定义的任何其他错误处理程序。 这是表达错误处理程序的便捷方法,如果应用程序不想使用ListenerContainerCustomizer
然后检查目标/组组合以设置错误处理程序。默认值:无。
Kafka 生产者属性
以下属性仅适用于 Kafka 生产者,并且
必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.
.
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.producer.<property>=<value> . |
- admin.configuration
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties
,并且将在将来的版本中删除对它的支持。 - admin.replicas-assignment
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment
,并且将在将来的版本中删除对它的支持。 - admin.replication-factor
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor
,并且将在将来的版本中删除对它的支持。 - 缓冲区大小
-
Kafka 生产者在发送前尝试批处理的数据量上限(以字节为单位)。
违约:
16384
. - 同步
-
生产者是否同步。
违约:
false
. - 发送超时表达式
-
根据传出消息计算的 SpEL 表达式,用于在启用同步发布时评估等待 ack 的时间,例如,
headers['mySendTimeout']
. 超时值以毫秒为单位。 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]
. 现在,在转换有效负载之前对表达式进行计算。违约:
none
. - batchTimeout
-
生产者在发送消息之前等待多长时间,以允许更多消息在同一批次中累积。 (通常,生产者根本不会等待,而只是发送上一次发送过程中累积的所有消息。非零值可能会以延迟为代价来增加吞吐量。
违约:
0
. - messageKey表达式
-
根据用于填充生成的 Kafka 消息的密钥的传出消息进行评估的 SpEL 表达式,例如
headers['myKey']
. 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]
. 现在,在转换有效负载之前对表达式进行评估。对于常规处理器(Function<String, String>
或Function<Message<?>, Message<?>
),如果生成的密钥需要与来自主题的传入密钥相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
对于响应式函数,需要牢记一个重要的警告。在这种情况下,由应用程序手动将标头从传入消息复制到出站消息。您可以设置标头,例如myKey
并使用headers['myKey']
如上所述,或者为方便起见,只需将KafkaHeaders.MESSAGE_KEY
header,并且您根本不需要设置此属性。违约:
none
. - headerPatterns
-
以逗号分隔的简单模式列表,用于匹配要映射到 Kafka 的 Spring 消息传递标头
Headers
在ProducerRecord
. 模式可以以通配符(星号)开头或结尾。 模式可以通过前缀!
. 匹配在第一次匹配(正或负)后停止。 例如!ask,as*
将通过ash
但不是ask
.id
和timestamp
永远不会映射。默认值:(所有标头 - 除了
*
id
和timestamp
) - 配置
-
映射包含通用 Kafka 生产者属性的键/值对。 这
bootstrap.servers
属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空地图。
- topic.properties
-
一个
Map
配置新主题时使用的 Kafka 主题属性——例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。 在预配新主题时使用。 请参阅
NewTopic
Javadocs 中的kafka-clients
罐。默认值:无。
- topic.replication-factor
-
预配主题时要使用的复制因子。覆盖活页夹范围的设置。 如果出现以下情况,则忽略
replicas-assignments
存在。默认值:无(使用活页夹范围的默认值 -1)。
- 使用主题标头
-
设置为
true
将默认绑定目标(主题名称)替换为KafkaHeaders.TOPIC
出站邮件中的邮件头。 如果标头不存在,则使用默认绑定目标。违约:
false
. - 记录元数据通道
-
的 bean 名称
MessageChannel
应将成功的发送结果发送到哪个;Bean 必须存在于应用程序上下文中。 发送到通道的消息是带有附加标头的已发送消息(转换后,如果有)KafkaHeaders.RECORD_METADATA
. 标头包含一个RecordMetadata
Kafka 客户端提供的对象;它包括在主题中写入记录的分区和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败的发送将进入生产者错误通道(如果已配置);请参阅 Kafka 错误通道。
默认值:null。
Kafka 绑定器使用partitionCount 将生产者设置为提示,以创建具有给定分区计数的主题(结合minPartitionCount ,两者中的最大值是正在使用的值)。
配置两者时要小心minPartitionCount 用于活页夹和partitionCount 对于应用程序,因为使用较大的值。
如果主题已存在分区计数较小且autoAddPartitions 禁用(默认值),则活页夹无法启动。
如果主题已存在分区计数较小且autoAddPartitions 启用,则添加新分区。
如果主题已存在分区数大于 (minPartitionCount 或partitionCount ),则使用现有分区计数。 |
- 压缩
-
将
compression.type
producer 属性。 支持的值包括none
,gzip
,snappy
,lz4
和zstd
. 如果您覆盖kafka-clients
jar 到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用zstd
压缩, 使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
.违约:
none
. - 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager
用于覆盖此绑定的绑定程序的事务管理器。 如果您想将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager
. 要实现记录的一次使用和生产,使用者和生产者绑定必须全部配置为相同的事务管理器。默认值:无。
- close超时
-
关闭生产者时等待的超时秒数。
违约:
30
- 允许非事务性
-
通常,与事务性绑定器关联的所有输出绑定都将在新事务中发布(如果尚未处理)。 此属性允许您覆盖该行为。 如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非事务已在处理中。
违约:
false