|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
配置选项
本节包含Apache Kafka绑定器使用的配置选项。
对于与绑定器相关的常见配置选项和属性,请参阅核心文档中的
Kafka 绑定属性
- spring.cloud.stream.kafka.binder.brokers
-
与Kafka绑定连接的经纪人的列表。
默认值:
localhost。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers允许不带端口信息的主机(例如,host1,host2:port2)。这设置了没有在代理列表中配置端口时的默认端口。默认值:
9092。 - spring.cloud.stream.kafka.binder.configuration
-
由绑定器创建的所有客户端传递的客户端属性(生产者和使用者)的键/值映射。 <br> 由于这些属性同时用于生产者和使用者,因此应将其限制使用于通用属性,例如安全性设置。 <br> 通过此配置提供的任何未知的Kafka生产者或使用者属性都会被筛选掉,不允许传播。 <br> 此处的属性会覆盖在启动中设置的任何属性。
默认值:空映射。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意Kafka客户端消费者属性的键值映射。除了支持已知的Kafka消费者属性外,还允许在此处设置未知的消费者属性。此处设置的属性会覆盖在boot和
configuration属性中设置的属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.headers
-
列表中包含传输程序运输的自定义标题。仅在与旧版应用程序(< 1.3.x)通信时需要(使用版本号< 0.11.0.0)。较新版本(版本号> 0.11.0.0)原生支持标头。
默认:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
要等待获取分区信息的时间,以秒为单位。健康报告在计时器到期时变为离线状态。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
在broker上所需确认的数量。 见producer的Kafka文档
acks属性。默认值:
1。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在设置为
autoCreateTopics或autoAddPartitions时有效。生产或消费数据的话题上配置绑定器的全局最小分区数。可以被partitionCount设置的生产者或instanceCount * concurrency设置的生产者(如果更大)所取代。默认值:
1。 - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,这里也允许使用未知的生产者属性。 这里的属性会覆盖在 boot 和上一个
configuration属性中设置的任何属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.replicationFactor
-
自动创建主题的复制因子,如果为
autoCreateTopics则有效。 可以在每个绑定上重写它。如果您使用的是 2.4 版本之前的 Kafka 代理版本,则此值应设置为至少 1。从版本 3.0.8 开始,绑定器默认使用
-1,表示将使用代理的 'default.replication.factor' 属性来确定副本数量。请与您的 Kafka 代理管理员确认是否有要求最小复制因子的策略,如果是这样,通常
default.replication.factor将匹配该值,而-1应在不需要大于最小值的复制因子时使用。默认值:
-1。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true,绑定器会自动创建新主题。
如果设置为false,绑定器依赖于主题已经配置好。
在后一种情况下,如果主题不存在,绑定器将无法启动。这个设置独立于代理的
auto.create.topics.enable设置,并不影响它。如果服务器设置为自动创建主题,它们可能作为元数据检索请求的一部分被创建,具有默认的代理设置。
默认值:
true。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true,则在必要时创建新分区。如果设置为false,则依赖于目标主题的分区大小已经配置。如果目标主题的分区数小于预期值,绑定程序将无法启动。默认值:
false。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在绑定程序中启用事务。请参阅Kafka文档中的
transaction.id和spring-kafka文档中的https://docs.spring.io/spring-kafka/reference/html/#transactions。当启用事务时,单独的producer属性被忽略,所有生产者使用spring.cloud.stream.kafka.binder.transaction.producer.*属性。默认的
null(无事务) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务绑定程序中生产者的全局生产者属性。
有关所有绑定程序支持的通用生产者属性,请参阅
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix和Kafka Producer Properties。默认值:请参阅各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用于将 Kafka 标头映射到和从 Kafka 标头进行映射的
KafkaHeaderMapper的 Bean 名称。
例如,如果您希望自定义使用 JSON 反序列化处理标头的BinderHeaderMapperbean 中的信任包集,则可以使用此功能。
如果未使用此属性提供此自定义BinderHeaderMapperbean,则绑定器将查找名称为kafkaBinderHeaderMapper且类型为BinderHeaderMapper的标头映射器 bean,然后回退到由绑定器创建的默认BinderHeaderMapper。默认值为 no.
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
标志,用于设置绑定器健康状态为
down,当发现主题上的任何分区没有领导者时(无论哪个消费者正在从中接收数据)。默认值:
true。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
当信任库或密钥库证书位置被指定为非本地文件系统资源(由org支持的资源)时。springframework.core.io.资源E。g.CLASSPATH、HTTP 等等。该绑定程序会将资源从可转换为org的路径复制到org。springframework.core.io.(类路径资源)到文件系统上的位置。This is true for both broker level certificates (0 and 1) and certificates intended for schema registry (2 and 3).请注意,信任库和密钥库的位置路径必须在
spring.cloud.stream.kafka.binder.configuration…下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location等。
文件将在指定为该属性值的位置复制。该属性的值必须是文件系统上可由正在运行应用程序的进程写入的现有目录。If this value is not set and the certificate file is a non-local file system resource, then it will be copied to System’s temp directory as returned bySystem.getProperty("java.io.tmpdir").这也是正确的,如果存在此值,但文件夹在文件系统中找不到或不可写。默认值为 no.
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
当将其设置为true时,每次访问度量时都会计算每个consumer topic的偏移量滞后。当将其设置为false时,只使用定期计算的偏移量滞后。
默认值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
间隔用于计算每个消费者主题的偏移滞后。 当禁用 0 或其 计算太长时使用此值。
默认值:60秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此绑定器中启用所有绑定的 Micrometer 观察注册表。
(默认值:false)
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicatormetada consumergroup.id。 This consumer is used by theHealthIndicatorto query the metadata about the topics in use。默认值为 no.
Kafka 消费者属性
以下属性可供Kafka消费者使用,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。
避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.consumer.<property>=<value>。 |
- admin.configuration
-
{ 'html':'***' }
- admin.replicas-assignment
-
{ 'html':'***' }
- admin.replication-factor
-
{ 'html':'***' }
- autoRebalanceEnabled
-
当为
0时,主题分区会自动在消费者组成员之间重新平衡。当为
1时,每个消费者根据2和3分配一组固定的分区。这需要在每个已启动的实例上适当地设置
4和5属性。6的值在这种情况下通常必须大于 1。默认值:
true。 - 处理每条记录
-
当为
autoCommitOffset时,此设置确定是否在处理每个记录后提交偏移量。默认情况下,所有记录都已处理后,会提交所有记录的偏移量。
一次返回的记录数可以通过Kafka属性控制,该属性通过consumerconsumer.poll()属性进行设置。通过max.poll.records。
将此设置为configuration可能会导致性能下降,但这样做可以减少故障发生时重复传递记录的可能性。
另外,请参阅绑定器true属性,它也影响提交偏移量的性能。
此属性从3.1版本开始弃用,建议使用ackMode。
如果未设置ackMode且批处理模式未启用,则将使用RECORDackMode。默认值:
false。 - 自动提交偏移量
-
从版本 3.1 开始,此属性已过时。请参阅有关替代方案的详细信息。当消息已处理时,是否自动提交偏移量。如果设置为 1,在传入消息的标题中存在一个键为 2 的标题。标题的类型为 3。应用程序可以使用此标题进行确认消息。有关详细信息,请参阅示例部分。当此属性设置为 4 时,Kafka 绑定程序将确认模式设置为 5,并且应用程序负责确认记录。另请参阅 6。
默认值:
true。 - 确认模式
-
指定容器确认模式。 此模式基于 Spring Kafka 中定义的 AckMode 枚举进行设置。 如果
ackEachRecord属性设置为true,且消费者未处于批处理模式,则使用确认模式RECORD,否则,使用此属性提供的确认模式。 - 自动提交错误
-
在可轮询的使用者中,如果设置为
true,则在出错时始终自动提交。如果未设置(默认)或false,则在可轮询的使用者中不自动提交。请注意,此属性仅适用于可轮询的使用者。默认值:未设置。
- 重置偏移量
-
是否将消费者的偏移量重置为提供的startOffset值。必须为false,如果提供了一个
KafkaBindingRebalanceListener;有关此属性的更多信息,请参阅rebalance监听器见重置-偏移量以了解更多信息。默认值:
false。 - startOffset
-
新组的起始偏移量。允许的值:
earliest和latest。
如果消费者组通过spring.cloud.stream.bindings.<channelName>.group显式设置给消费者 'binding'(在earliest设置),则 'startOffset' 被设为latest。否则,对于anonymous消费者组,它被设为xref page。
有关此属性的更多信息,请参阅reset-offsets。默认值:null(等同于
earliest)。 - 启用死信队列
-
当设置为 true 时,它会启用消费者的死信队列(DLQ)行为。
默认情况下,导致错误的消息会被转发到名为error.<destination>.<group>的主题。
通过设置dlqName属性或定义类型为DlqDestinationResolver的@Bean,可以配置 DLQ 主题名称。
这对于错误数量相对较少且重放整个原始主题可能过于繁琐的情况,提供了替代于更常见的 Kafka 重放场景的选项。
有关更多信息,请参阅 kafka 死信队列处理。
从版本 2.0 开始,发送到 DLQ 主题的消息增强了以下标头:x-original-topic、x-exception-message和x-exception-stacktrace,格式为byte[]。
默认情况下,失败的记录被发送到与原记录相同的分区编号中的 DLQ 主题。
请参阅 dlq 分区选择 了解如何更改该行为。
当destinationIsPattern为true时不允许使用。默认值:
false。 - 分区
-
当
enableDlq为真,且未设置此属性时,将创建一个与主主题分区数相同的死信队列。
通常情况下,死信记录会被发送到死信队列中的相同分区作为原始记录。
可以更改此行为;请参阅死信队列分区选择。
如果将此属性设置为1,并且没有DqlPartitionFunction的bean,则所有死信记录将写入分区0。
如果此属性大于1,则您必须提供DlqPartitionFunction的bean。
请注意,实际分区数量受绑定器的minPartitionCount属性影响。默认值:
none - 配置
-
包含通用 Kafka 消费者属性的键/值对映射。除了具有 Kafka 消费者属性外,还可以传递其他配置属性。例如应用程序所需的某些属性,如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。bootstrap.servers属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空映射。
- 死信队列名称
-
接收错误消息的死信队列(DLQ)主题的名称。
默认值:null(如果未指定,则导致错误的消息会转发到名为
error.<destination>.<group>的主题)。 - 延迟消息生产者属性
-
使用此功能,可以设置与死信队列(DLQ)相关的生产者属性。
所有可以通过 Kafka 生产者属性设置的属性都可以通过此属性进行设置。
当在消费者上启用原生解码时(即 useNativeDecoding: true),应用程序必须为 DLQ 提供相应的键/值序列化器。
这必须以dlqProducerProperties.configuration.key.serializer和dlqProducerProperties.configuration.value.serializer的形式提供。默认:Kafka 生产者默认属性。
- standardHeaders
-
指示入站通道适配器填充哪些标准标头。
允许值:none,id,timestamp,或both。
如果使用原生反序列化并且接收消息的第一个组件需要一个id(例如配置为使用JDBC消息存储的聚合器),则很有用。默认值:
none - 转换器Bean名称
-
实现
RecordMessageConverter的bean的名称。用于入站通道适配器中,以替换默认的MessagingMessageConverter。默认值:
null - 空闲事件间隔
-
表示最近未收到消息的事件之间的时间间隔(以毫秒为单位)。 使用
ApplicationListener<ListenerContainerIdleEvent>可接收这些事件。 有关用法示例,请参阅暂停-恢复。默认值:
30000 - 目的地是模式
-
当为 true 时,目的地被视为正则表达式
Pattern,代理用其匹配主题名称。
当为 true 时,不会配置主题,并且不允许使用enableDlq,因为在配置阶段绑定器不知道主题名称。
请注意,检测与模式匹配的新主题所需的时间由消费者属性metadata.max.age.ms控制(在编写本文时),默认值为 300,000 毫秒(5 分钟)。
可以使用上面的configuration属性进行配置。默认值:
false - topic.properties
-
用于在配置新主题时使用的Kafka主题属性的
Map——例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0默认值为 no.
- topic.replicas-assignment
-
一个<Integer, List<Integer>> 的副本分配映射,其中键为分区,值为分配。
用于配置新主题时使用。
参见 <code>0</code> Java 文档在 <code>1</code> jar 中的说明。
默认值为 no.
- topic.replication-factor
-
配置主题时要使用的复制因子。覆盖绑定器范围内的设置。
如果存在replicas-assignments,则忽略此设置。缺点:无(使用绑定器公共默认值-1)。}
- 轮询超时
-
轮询中用于可轮询消费者超时时间。
默认值:5秒。
- 事务管理器
-
用于覆盖此绑定的Binder事务管理器的
KafkaAwareTransactionManager的Bean名称。如果要使用ChainedKafkaTransactionManaager将另一个事务与Kafka事务同步,则通常需要它。为了实现记录的精确一次消费和生产,必须为所有消费者和生产者绑定配置相同的事务管理器。默认值为 no.
- 事务提交恢复
-
使用事务性绑定器时,恢复记录(例如重试次数耗尽后发送到死信主题的记录)的偏移量将默认通过新事务进行提交。将此属性设置为
false会抑制已恢复记录的偏移量提交。默认值:true。
- 全局错误处理程序Bean名称
-
CommonErrorHandler每个消费者绑定使用的bean名称。当提供时,此用户提供的CommonErrorHandler优先于捆绑器定义的任何其他错误处理器。ListenerContainerCustomizer不使用并检查目标/组组合来设置错误处理器的应用程序可以方便地表达错误处理器。默认值为 no.
Kafka 生产者属性
仅适用于Kafka生产者的以下属性可用,并且必须使用spring.cloud.stream.kafka.bindings.<channelName>.producer.作为前缀。
避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.producer.<property>=<value>。 |
- admin.configuration
-
{ 'html':'***' }
- admin.replicas-assignment
-
{ 'html':'***' }
- admin.replication-factor
-
{ 'html':'***' }
- 缓冲区大小
-
Kafka 生产者在发送前尝试批处理数据的最大字节数上限。
默认值:
16384。 - 同步
-
生产者是否为同步的。
默认值:
false。 - 发送超时表达式
-
在启用同步发布时,针对传出消息评估的 SpEL 表达式用于确定等待确认的时间(以毫秒为单位)——例如,
headers['mySendTimeout']。
当版本低于 3.0 时,除非使用原生编码,否则无法使用负载,因为在评估此表达式时,负载已经是byte[]的形式。现在,在转换负载之前就评估该表达式。默认值:
none。 - 批量超时
-
生产者在发送同一批次中的消息之前等待多长时间以允许更多消息累积。(通常,生产者不会等待,并且会立即发送所有已累积的消息。非零值可能会增加吞吐量,但会增加延迟)。
默认值:
0。 - 消息键表达式
-
针对传出消息求值的 SpEL 表达式,用于填充生成的 Kafka 消息的键——例如,
headers['myKey']。在 3.0 版本之前,除非使用原生编码,否则无法使用负载,因为在表达式被评估时,负载已经是byte[]的形式。现在,在负载转换之前就对表达式进行评估。对于常规处理器(Function<String, String>或Function<Message<?>, Message<?>),如果要使生成的键与来自主题的传入键相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']对于反应函数,请记住一个重要注意事项。在这种情况下,应用程序必须手动将传入消息的头复制到传出消息中。您可以设置标头,如myKey并按照上述方法使用headers['myKey'],或者为了方便起见,只需设置KafkaHeaders.MESSAGE_KEY标头即可,根本不需要设置此属性。默认值:
none。 - headerPatterns
-
用于匹配Spring消息头并映射到Kafka
Headers的简单模式的逗号分隔列表,位于ProducerRecord中。
模式可以以通配符(星号)开头或结尾。
可以通过在前面加上!来否定模式。
匹配将在第一次匹配(正向或负向)后停止。
例如!ask,as*将通过ash但不通过ask。id和timestamp永远不会被映射。默认值:
*(所有请求头,除了id和timestamp) - 配置
-
包含通用 Kafka 生产者属性的键/值对映射。
bootstrap.servers属性不能在其中设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空映射。
- topic.properties
-
用于在配置新主题时使用的Kafka主题属性的
Map——例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0 - topic.replicas-assignment
-
一个<Integer, List<Integer>> 的副本分配映射,其中键为分区,值为分配。
用于配置新主题时使用。
参见 <code>0</code> Java 文档在 <code>1</code> jar 中的说明。
默认值为 no.
- topic.replication-factor
-
配置主题时要使用的复制因子。覆盖绑定器范围内的设置。
如果存在replicas-assignments,则忽略此设置。缺点:无(使用绑定器公共默认值-1)。}
- 使用主题标题
-
设置为
true,以使用传出消息中的KafkaHeaders.TOPIC消息头的值来覆盖默认绑定目标(主题名称)。如果不存在该头部,则使用默认绑定目标。默认值:
false。 - 记录元数据通道
-
成功发送结果应被发送到的
MessageChannel的bean名称;该bean必须存在于应用程序上下文中。
发送到通道的消息是已转换(如果有)的已发送消息,并带有额外的标头KafkaHeaders.RECORD_METADATA。
此标头包含Kafka客户端提供的RecordMetadata对象;它包括记录在主题中写入的分区和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)失败发送的消息会进入生产者的错误通道(如果已配置);参见Kafka 错误通道。
默认值:null。
Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以创建具有给定分区数目的主题(结合minPartitionCount,两者中的最大值被用作该值)。当同时为绑定器配置 如果一个已经存在的主题分区数目较小,并且 如果一个已经存在的主题分区数目较小,并且 如果一个已存在的主题分区数目大于 ( |
- 压缩
-
设置
compression.type生产者属性。
支持的值为none、gzip、snappy、lz4和zstd。
如果您覆盖kafka-clientsjar到2.1.0(或更高版本),如Apache Kafka文档中的Spring讨论,并希望使用zstd压缩,请使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd。默认值:
none。 - 事务管理器
-
用于覆盖此绑定的Binder事务管理器的
KafkaAwareTransactionManager的Bean名称。如果要使用ChainedKafkaTransactionManaager将另一个事务与Kafka事务同步,则通常需要它。为了实现记录的精确一次消费和生产,必须为所有消费者和生产者绑定配置相同的事务管理器。默认值为 no.
- 关闭超时
-
关闭生产者时等待的超时时间(以秒为单位)。
默认值:
30 - 允许非事务性
-
通常,与事务绑定器关联的所有输出绑定都会在新事务中发布,除非已有事务正在运行。此属性允许您覆盖该行为。如果设置为 true,则发布的记录将不会在此输出绑定中运行事务,除非已经存在一个事务。
默认值:
false