这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 spring-doc.cadn.net.cn

配置选项

本节包含Kafka Streams绑定器使用的配置选项。spring-doc.cadn.net.cn

有关绑定器的常见配置选项和属性,请参阅核心文档spring-doc.cadn.net.cn

流处理绑定属性

绑定器级别的以下属性可用,并且必须使用spring.cloud.stream.kafka.streams.binder.作为前缀。在Kafka流绑定器中重复使用的任何由提供的Kafka绑定器属性都必须使用spring.cloud.stream.kafka.streams.binder而不是spring.cloud.stream.kafka.binder作为前缀。此规则唯一的例外是在定义Kafka引导服务器属性时,任一前缀都可以工作。spring-doc.cadn.net.cn

配置

包含与 Apache Kafka Streams API 相关的属性键/值对映射。此属性必须使用 spring.cloud.stream.kafka.streams.binder. 前缀。以下是一些使用此属性的示例。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关流配置中可能包含的所有属性的更多信息,请参阅Apache Kafka Streams文档中的StreamsConfig JavaDoc。可以通过StreamsConfig设置的任何配置都可以通过此方式进行设置。使用此属性时,由于这是绑定器级别的属性,因此适用于整个应用程序。如果您的应用程序中有多个处理器,则它们都将获取这些属性。在像application.id这样的情况下,这将变得有问题,因此您必须仔细检查如何使用此绑定器级别configuration属性映射来自StreamsConfig的属性。spring-doc.cadn.net.cn

functions.<function-bean-name>.applicationId

仅适用于函数式处理器。 可以用于为应用程序中的每个功能设置应用ID。 在有多个功能的情况下,这是一种方便的方式来设置应用ID。spring-doc.cadn.net.cn

functions.<function-bean-name>.configuration

仅适用于函数式风格处理器。
包含与Apache Kafka Streams API相关的属性的键/值对映射。
这类似于上述描述的绑定器级别的configuration属性,但这个configuration级别的属性仅限于命名函数使用。
当您有多个处理器并且希望根据特定函数限制配置访问时,可能需要使用此功能。
所有StreamsConfig属性都可以在此处使用。spring-doc.cadn.net.cn

经纪人

代理网址spring-doc.cadn.net.cn

默认值: localhostspring-doc.cadn.net.cn

zk节点

Zookeeper 地址spring-doc.cadn.net.cn

默认值: localhostspring-doc.cadn.net.cn

反序列化异常处理程序

反序列化错误处理器类型。 此处理器应用于绑定器级别,因此对应用程序中的所有输入绑定都适用。 在消费者绑定级别可以更细致地控制它。 可能的值为 - logAndContinuelogAndFailskipAndContinuesendToDlqspring-doc.cadn.net.cn

默认值: logAndFailspring-doc.cadn.net.cn

applicationId

在绑定器级别全局设置 Kafka Streams 应用程序的应用程序 ID 的便捷方法。如果应用程序包含多个函数,则应分别设置应用程序 ID。详细讨论请参阅上文。spring-doc.cadn.net.cn

默认情况下,应用程序会生成一个静态的应用程序ID。有关更多详细信息,请参阅应用程序ID部分。spring-doc.cadn.net.cn

stateStoreRetry.maxAttempts

尝试连接到状态存储的最大重试次数。spring-doc.cadn.net.cn

默认值:1spring-doc.cadn.net.cn

stateStoreRetry.backoffPeriod

重试时尝试连接到状态存储的退避周期。spring-doc.cadn.net.cn

默认值:1000 毫秒spring-doc.cadn.net.cn

消费者属性

绑定器级别的任意消费者属性。spring-doc.cadn.net.cn

生产者属性

绑定器级别的任意生产者属性。spring-doc.cadn.net.cn

包含已停止处理器的健康检查

当通过执行器停止处理器的绑定时,此处理器默认将不参与健康检查。将此属性设置为 true 可启用所有处理器(包括当前通过绑定执行器端点停止的处理器)的健康检查。spring-doc.cadn.net.cn

(默认值:false)spring-doc.cadn.net.cn

Kafka Streams Producer 属性

以下属性仅适用于Kafka Streams生产者,且必须使用spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.作为前缀。
为了方便起见,如果存在多个输出绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.进行配置。spring-doc.cadn.net.cn

密钥序列化器

键序列化器/反序列化器spring-doc.cadn.net.cn

默认:参见上述关于消息序列化/反序列化的讨论spring-doc.cadn.net.cn

值序列化器

序列化/反序列化器spring-doc.cadn.net.cn

默认:参见上述关于消息序列化/反序列化的讨论spring-doc.cadn.net.cn

使用原生编码

启用/禁用原生编码的标志spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

流分区程序Bean名称

消费者处要使用的自定义传出分区程序bean名称。
应用程序可以提供自定义StreamPartitioner作为Spring bean,该bean的名称可被提供给生产者以代替默认值使用。spring-doc.cadn.net.cn

默认:参见上文关于出站分区支持的讨论。spring-doc.cadn.net.cn

产生的

处理器生产数据的目标接收组件的自定义名称。spring-doc.cadn.net.cn

默认值:none(由Kafka Streams生成)spring-doc.cadn.net.cn

Kafka Streams 消费者属性

以下属性可用于Kafka Streams消费者,并且必须使用spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.作为前缀。为了方便起见,如果存在多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.进行配置。spring-doc.cadn.net.cn

applicationId

根据输入绑定设置 application.id。spring-doc.cadn.net.cn

默认值:见上文。spring-doc.cadn.net.cn

密钥序列化器

键序列化器/反序列化器spring-doc.cadn.net.cn

默认:参见上述关于消息序列化/反序列化的讨论spring-doc.cadn.net.cn

值序列化器

序列化/反序列化器spring-doc.cadn.net.cn

默认:参见上述关于消息序列化/反序列化的讨论spring-doc.cadn.net.cn

物化为

使用传入的 KTable 类型时,用于生成状态存储spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

缓存已禁用

为物化KTable禁用缓存。
当设置为true时,在Materialized对象上调用withCachingDisabled()
当设置为false时,在Materialized对象上调用withCachingEnabled()spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

日志记录已禁用

为物化KTable禁用日志记录。当设置为true时,会在Materialized对象上调用withLoggingDisabled()spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

使用原生解码

启用/禁用原生解码的标志spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

死信队列名称

死信队列主题名称。spring-doc.cadn.net.cn

默定:请参考上面错误处理和DLQ的讨论。spring-doc.cadn.net.cn

startOffset

如果消费者没有已提交的偏移量可以从中消费,则开始的位置偏移。 这主要用于当消费者第一次从主题中消费时的情况。 Kafka Streams 使用 earliest 作为默认策略,绑定器也使用相同的默认值。 可以通过此属性将其覆盖为 latestspring-doc.cadn.net.cn

默认值: earliestspring-doc.cadn.net.cn

注:在消费者中使用 resetOffsets 对 Kafka Streams 绑定器没有任何影响。 与基于消息通道的绑定器不同,Kafka Streams 绑定器不会根据需求寻求开始或结束位置。spring-doc.cadn.net.cn

反序列化异常处理程序

反序列化错误处理程序类型。此处理器按消费者绑定应用,而不是前面所述的绑定器级别属性。
可能的值为 - logAndContinuelogAndFailskipAndContinuesendToDlqspring-doc.cadn.net.cn

默认值: logAndFailspring-doc.cadn.net.cn

时间戳提取器Bean名称

消费者使用的特定时间戳提取器的bean名称。
应用程序可以提供TimestampExtractor作为Spring bean,并且此bean的名称可以提供给消费者,以代替默认值使用。spring-doc.cadn.net.cn

默认值:参见上文关于时间戳提取器的讨论。spring-doc.cadn.net.cn

事件类型

此绑定支持的事件类型的逗号分隔列表。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

事件类型标头键

通过此绑定传入的每个记录上的事件类型标题键。spring-doc.cadn.net.cn

默认值: event_typespring-doc.cadn.net.cn

已消费作为

处理器从中消费的源组件的自定义名称。spring-doc.cadn.net.cn

默认值:none(由 Kafka Streams 生成)spring-doc.cadn.net.cn

关于并发的特别说明

在 Kafka Streams 中,您可以使用 num.stream.threads 属性来控制处理器可以创建的线程数量。
您可以通过上述绑定器、函数、生产者或消费者级别的各种 configuration 选项来实现这一点。
您还可以使用核心 Spring Cloud Stream 提供的 concurrency 属性来达到此目的。
使用此属性时,需要将其应用于消费者。
当您有多个输入绑定时,在第一个输入绑定上设置它。
例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,将由绑定器转换为 num.stream.threads
如果您有多个处理器,并且一个处理器定义了绑定级别并发性,但其他处理器没有,则那些未定义绑定级别并发性的处理器将默认回通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的绑定器范围属性。
如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。spring-doc.cadn.net.cn