配置选项

本节包含 Kafka Streams 绑定器使用的配置选项。spring-doc.cadn.net.cn

有关与 binder 相关的常见配置选项和属性,请参阅核心文档spring-doc.cadn.net.cn

Kafka Streams Binder 属性

以下属性在活页夹级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.在 Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性都必须以spring.cloud.stream.kafka.streams.binder而不是spring.cloud.stream.kafka.binder. 此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,任一前缀都有效。spring-doc.cadn.net.cn

配置

映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以spring.cloud.stream.kafka.streams.binder.. 以下是使用此属性的一些示例。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfigApache Kafka Streams 文档中的 JavaDocs。 您可以从StreamsConfig可以通过这个设置。 使用此属性时,它适用于整个应用程序,因为这是活页夹级别的属性。 如果应用程序中有多个处理器,则所有处理器都将获取这些属性。 对于像application.id,这将成为问题,因此您必须仔细检查属性如何StreamsConfig使用此活页夹级别进行映射configuration财产。spring-doc.cadn.net.cn

functions.<function-bean-name>.applicationId

仅适用于功能型处理器。 这可用于设置应用程序中每个功能的应用程序 ID。 在多个函数的情况下,这是设置应用程序 ID 的便捷方法。spring-doc.cadn.net.cn

functions.<function-bean-name>.configuration

仅适用于功能型处理器。 映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于活页夹级别configuration属性描述,但这个级别的configuration属性仅针对命名函数进行限制。 当您有多个处理器并且想要根据特定函数限制对配置的访问时,您可能需要使用它。 都StreamsConfig属性可以在此处使用。spring-doc.cadn.net.cn

经纪人

代理 URLspring-doc.cadn.net.cn

违约:localhostspring-doc.cadn.net.cn

zk节点

Zookeeper URLspring-doc.cadn.net.cn

违约:localhostspring-doc.cadn.net.cn

反序列化ExceptionHandler

反序列化错误处理程序类型。 此处理程序在绑定程序级别应用,因此应用于应用程序中的所有输入绑定。 有一种方法可以在消费者绑定级别以更细粒度的方式控制它。 可能的值是 -logAndContinue,logAndFail,skipAndContinuesendToDlqspring-doc.cadn.net.cn

违约:logAndFailspring-doc.cadn.net.cn

应用程序 Id

在绑定器级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个函数,则应用程序 ID 应以不同的方式设置。 请参阅上面详细讨论设置应用程序 ID。spring-doc.cadn.net.cn

默认:应用程序将生成静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。spring-doc.cadn.net.cn

stateStoreRetry.maxAttempts

尝试连接到状态存储的最大尝试次数。spring-doc.cadn.net.cn

默认值:1spring-doc.cadn.net.cn

stateStoreRetry.backoffPeriod

尝试在重试时连接到状态存储时的回退期。spring-doc.cadn.net.cn

默认值:1000 毫秒spring-doc.cadn.net.cn

consumer属性

活页夹级别的任意使用者属性。spring-doc.cadn.net.cn

producer属性

绑定器级别的任意生产者属性。spring-doc.cadn.net.cn

includeStoppedProcessorsForHealthCheck

当处理器的绑定通过执行器停止时,默认情况下,该处理器将不参与健康检查。 将此属性设置为true为所有处理器启用运行状况检查,包括当前通过绑定执行器端点停止的处理器。spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

Kafka Streams 生产者属性

以下属性适用于 Kafka Streams 生产者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.为方便起见,如果有多个输出绑定并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer..spring-doc.cadn.net.cn

keySerde

要使用的密钥 Serdespring-doc.cadn.net.cn

默认值:请参阅上面关于消息反序列化的讨论spring-doc.cadn.net.cn

值Serde

值 serde 使用spring-doc.cadn.net.cn

默认值:请参阅上面关于消息反序列化的讨论spring-doc.cadn.net.cn

使用原生编码

标志来启用/禁用本机编码spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

streamPartitionerBeanName (流分区器BeanName)

要在使用者处使用的自定义出站分区器 Bean 名称。 应用程序可以提供自定义StreamPartitioner作为 Spring Bean,并且可以将此 Bean 的名称提供给生产者以代替默认名称。spring-doc.cadn.net.cn

默认值:请参阅上面有关出站分区支持的讨论。spring-doc.cadn.net.cn

生产作为

处理器要生成到的接收器组件的自定义名称。spring-doc.cadn.net.cn

违约:none(由 Kafka Streams 生成)spring-doc.cadn.net.cn

Kafka Streams 消费者属性

以下属性可用于 Kafka Streams 使用者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.为方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer..spring-doc.cadn.net.cn

应用程序 Id

为每个输入绑定设置 application.id。spring-doc.cadn.net.cn

默认值:见上文。spring-doc.cadn.net.cn

keySerde

要使用的密钥 Serdespring-doc.cadn.net.cn

默认值:请参阅上面关于消息反序列化的讨论spring-doc.cadn.net.cn

值Serde

值 serde 使用spring-doc.cadn.net.cn

默认值:请参阅上面关于消息反序列化的讨论spring-doc.cadn.net.cn

具体化为

状态存储在使用传入的 KTable 类型时实现spring-doc.cadn.net.cn

违约:none.spring-doc.cadn.net.cn

使用原生解码

标志来启用/禁用本机解码spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

dlq名称

DLQ 主题名称。spring-doc.cadn.net.cn

默认值:请参阅上文有关错误处理和 DLQ 的讨论。spring-doc.cadn.net.cn

开始偏移量

如果没有要使用的已提交偏移量,则从偏移量开始。 这主要用于消费者第一次使用某个主题时。 Kafka Streams 使用earliest作为默认策略,活页夹使用相同的默认值。 这可以重写为latest使用此属性。spring-doc.cadn.net.cn

违约:earliest.spring-doc.cadn.net.cn

注意:使用resetOffsets对 Kafka Streams 绑定器没有任何影响。 与基于消息通道的绑定器不同,Kafka Streams 绑定器不会寻求按需开始或结束。spring-doc.cadn.net.cn

反序列化ExceptionHandler

反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面所述的绑定器级别属性。 可能的值是 -logAndContinue,logAndFail,skipAndContinuesendToDlqspring-doc.cadn.net.cn

违约:logAndFailspring-doc.cadn.net.cn

timestampExtractorBeanName

要在使用者处使用的特定时间戳提取器 Bean 名称。 应用程序可以提供TimestampExtractor作为 Spring bean,并且可以将此 bean 的名称提供给消费者以代替默认名称。spring-doc.cadn.net.cn

默认值:请参阅上面关于时间戳提取器的讨论。spring-doc.cadn.net.cn

事件类型

此绑定支持的事件类型的逗号分隔列表。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

eventTypeHeaderKey

事件类型标头键。spring-doc.cadn.net.cn

违约:event_typespring-doc.cadn.net.cn

consumedAs

处理器从中消费的源组件的自定义名称。spring-doc.cadn.net.cn

亲爱的:none(由 Kafka Streams 生成)spring-doc.cadn.net.cn

并发特别说明

在 Kafka Streams 中,您可以使用num.stream.threads财产。 这,您可以使用各种configuration上述 binder、functions、producer 或 consumer 级别下的选项。 您还可以使用concurrency核心 Spring Cloud Stream 为此目的提供的属性。 使用此功能时,您需要在消费者上使用它。 当有多个输入绑定时,请在第一个输入绑定上设置此值。 例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency,它将被翻译为num.stream.threads通过活页夹。 如果您有多个处理器,并且一个处理器定义绑定级别并发,但不定义其他处理器,则那些没有绑定级别并发的处理器将默认返回通过spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads. 如果此 Binder 配置不可用,则应用程序将使用 Kafka Streams 设置的默认设置。spring-doc.cadn.net.cn