配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关与 binder 相关的常见配置选项和属性,请参阅核心文档。
Kafka Streams Binder 属性
以下属性在活页夹级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.
在 Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性都必须以spring.cloud.stream.kafka.streams.binder
而不是spring.cloud.stream.kafka.binder
.
此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,任一前缀都有效。
- 配置
-
映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以
spring.cloud.stream.kafka.streams.binder.
. 以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfig
Apache Kafka Streams 文档中的 JavaDocs。
您可以从StreamsConfig
可以通过这个设置。
使用此属性时,它适用于整个应用程序,因为这是活页夹级别的属性。
如果应用程序中有多个处理器,则所有处理器都将获取这些属性。
对于像application.id
,这将成为问题,因此您必须仔细检查属性如何StreamsConfig
使用此活页夹级别进行映射configuration
财产。
- functions.<function-bean-name>.applicationId
-
仅适用于功能型处理器。 这可用于设置应用程序中每个功能的应用程序 ID。 在多个函数的情况下,这是设置应用程序 ID 的便捷方法。
- functions.<function-bean-name>.configuration
-
仅适用于功能型处理器。 映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于活页夹级别
configuration
属性描述,但这个级别的configuration
属性仅针对命名函数进行限制。 当您有多个处理器并且想要根据特定函数限制对配置的访问时,您可能需要使用它。 都StreamsConfig
属性可以在此处使用。 - 经纪人
-
代理 URL
违约:
localhost
- zk节点
-
Zookeeper URL
违约:
localhost
- 反序列化ExceptionHandler
-
反序列化错误处理程序类型。 此处理程序在绑定程序级别应用,因此应用于应用程序中的所有输入绑定。 有一种方法可以在消费者绑定级别以更细粒度的方式控制它。 可能的值是 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
违约:
logAndFail
- 应用程序 Id
-
在绑定器级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个函数,则应用程序 ID 应以不同的方式设置。 请参阅上面详细讨论设置应用程序 ID。
默认:应用程序将生成静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大尝试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
尝试在重试时连接到状态存储时的回退期。
默认值:1000 毫秒
- consumer属性
-
活页夹级别的任意使用者属性。
- producer属性
-
绑定器级别的任意生产者属性。
- includeStoppedProcessorsForHealthCheck
-
当处理器的绑定通过执行器停止时,默认情况下,该处理器将不参与健康检查。 将此属性设置为
true
为所有处理器启用运行状况检查,包括当前通过绑定执行器端点停止的处理器。默认值:false
Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为方便起见,如果有多个输出绑定并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.
.
- keySerde
-
要使用的密钥 Serde
默认值:请参阅上面关于消息反序列化的讨论
- 值Serde
-
值 serde 使用
默认值:请参阅上面关于消息反序列化的讨论
- 使用原生编码
-
标志来启用/禁用本机编码
违约:
true
. - streamPartitionerBeanName (流分区器BeanName)
-
要在使用者处使用的自定义出站分区器 Bean 名称。 应用程序可以提供自定义
StreamPartitioner
作为 Spring Bean,并且可以将此 Bean 的名称提供给生产者以代替默认名称。默认值:请参阅上面有关出站分区支持的讨论。
- 生产作为
-
处理器要生成到的接收器组件的自定义名称。
违约:
none
(由 Kafka Streams 生成)
Kafka Streams 消费者属性
以下属性可用于 Kafka Streams 使用者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.
.
- 应用程序 Id
-
为每个输入绑定设置 application.id。
默认值:见上文。
- keySerde
-
要使用的密钥 Serde
默认值:请参阅上面关于消息反序列化的讨论
- 值Serde
-
值 serde 使用
默认值:请参阅上面关于消息反序列化的讨论
- 具体化为
-
状态存储在使用传入的 KTable 类型时实现
违约:
none
. - 使用原生解码
-
标志来启用/禁用本机解码
违约:
true
. - dlq名称
-
DLQ 主题名称。
默认值:请参阅上文有关错误处理和 DLQ 的讨论。
- 开始偏移量
-
如果没有要使用的已提交偏移量,则从偏移量开始。 这主要用于消费者第一次使用某个主题时。 Kafka Streams 使用
earliest
作为默认策略,活页夹使用相同的默认值。 这可以重写为latest
使用此属性。违约:
earliest
.
注意:使用resetOffsets
对 Kafka Streams 绑定器没有任何影响。
与基于消息通道的绑定器不同,Kafka Streams 绑定器不会寻求按需开始或结束。
- 反序列化ExceptionHandler
-
反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面所述的绑定器级别属性。 可能的值是 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
违约:
logAndFail
- timestampExtractorBeanName
-
要在使用者处使用的特定时间戳提取器 Bean 名称。 应用程序可以提供
TimestampExtractor
作为 Spring bean,并且可以将此 bean 的名称提供给消费者以代替默认名称。默认值:请参阅上面关于时间戳提取器的讨论。
- 事件类型
-
此绑定支持的事件类型的逗号分隔列表。
违约:
none
- eventTypeHeaderKey
-
事件类型标头键。
违约:
event_type
- consumedAs
-
处理器从中消费的源组件的自定义名称。
亲爱的:
none
(由 Kafka Streams 生成)
并发特别说明
在 Kafka Streams 中,您可以使用num.stream.threads
财产。
这,您可以使用各种configuration
上述 binder、functions、producer 或 consumer 级别下的选项。
您还可以使用concurrency
核心 Spring Cloud Stream 为此目的提供的属性。
使用此功能时,您需要在消费者上使用它。
当有多个输入绑定时,请在第一个输入绑定上设置此值。
例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency
,它将被翻译为num.stream.threads
通过活页夹。
如果您有多个处理器,并且一个处理器定义绑定级别并发,但不定义其他处理器,则那些没有绑定级别并发的处理器将默认返回通过spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
.
如果此 Binder 配置不可用,则应用程序将使用 Kafka Streams 设置的默认设置。