配置选项
本节包含Kafka Streams绑定器使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档。
流处理绑定属性
绑定器级别的以下属性可用,并且必须使用spring.cloud.stream.kafka.streams.binder.作为前缀。在Kafka流绑定器中重复使用的任何由提供的Kafka绑定器属性都必须使用spring.cloud.stream.kafka.streams.binder而不是spring.cloud.stream.kafka.binder作为前缀。此规则唯一的例外是在定义Kafka引导服务器属性时,任一前缀都可以工作。
- 配置
-
包含与 Apache Kafka Streams API 相关的属性键/值对映射。此属性必须使用
spring.cloud.stream.kafka.streams.binder.前缀。以下是一些使用此属性的示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关流配置中可能包含的所有属性的更多信息,请参阅Apache Kafka Streams文档中的StreamsConfig JavaDoc。可以通过StreamsConfig设置的任何配置都可以通过此方式进行设置。使用此属性时,由于这是绑定器级别的属性,因此适用于整个应用程序。如果您的应用程序中有多个处理器,则它们都将获取这些属性。在像application.id这样的情况下,这将变得有问题,因此您必须仔细检查如何使用此绑定器级别configuration属性映射来自StreamsConfig的属性。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。 可以用于为应用程序中的每个功能设置应用ID。 在有多个功能的情况下,这是一种方便的方式来设置应用ID。
- functions.<function-bean-name>.configuration
-
仅适用于函数式风格处理器。
包含与Apache Kafka Streams API相关的属性的键/值对映射。
这类似于上述描述的绑定器级别的configuration属性,但这个configuration级别的属性仅限于命名函数使用。
当您有多个处理器并且希望根据特定函数限制配置访问时,可能需要使用此功能。
所有StreamsConfig属性都可以在此处使用。 - 经纪人
-
代理网址
默认值:
localhost - zk节点
-
Zookeeper 地址
默认值:
localhost - 反序列化异常处理程序
-
反序列化错误处理器类型。 此处理器应用于绑定器级别,因此对应用程序中的所有输入绑定都适用。 在消费者绑定级别可以更细致地控制它。 可能的值为 -
logAndContinue、logAndFail、skipAndContinue或sendToDlq默认值:
logAndFail - applicationId
-
在绑定器级别全局设置 Kafka Streams 应用程序的应用程序 ID 的便捷方法。如果应用程序包含多个函数,则应分别设置应用程序 ID。详细讨论请参阅上文。
默认情况下,应用程序会生成一个静态的应用程序ID。有关更多详细信息,请参阅应用程序ID部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大重试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接到状态存储的退避周期。
默认值:1000 毫秒
- 消费者属性
-
绑定器级别的任意消费者属性。
- 生产者属性
-
绑定器级别的任意生产者属性。
- 包含已停止处理器的健康检查
-
当通过执行器停止处理器的绑定时,此处理器默认将不参与健康检查。将此属性设置为
true可启用所有处理器(包括当前通过绑定执行器端点停止的处理器)的健康检查。(默认值:false)
Kafka Streams Producer 属性
以下属性仅适用于Kafka Streams生产者,且必须使用spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.作为前缀。
为了方便起见,如果存在多个输出绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.进行配置。
- 密钥序列化器
-
键序列化器/反序列化器
默认:参见上述关于消息序列化/反序列化的讨论
- 值序列化器
-
序列化/反序列化器
默认:参见上述关于消息序列化/反序列化的讨论
- 使用原生编码
-
启用/禁用原生编码的标志
默认值:
true。 - 流分区程序Bean名称
-
消费者处要使用的自定义传出分区程序bean名称。
应用程序可以提供自定义StreamPartitioner作为Spring bean,该bean的名称可被提供给生产者以代替默认值使用。默认:参见上文关于出站分区支持的讨论。
- 产生的
-
处理器生产数据的目标接收组件的自定义名称。
默认值:
none(由Kafka Streams生成)
Kafka Streams 消费者属性
以下属性可用于Kafka Streams消费者,并且必须使用spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.作为前缀。为了方便起见,如果存在多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.进行配置。
- applicationId
-
根据输入绑定设置 application.id。
默认值:见上文。
- 密钥序列化器
-
键序列化器/反序列化器
默认:参见上述关于消息序列化/反序列化的讨论
- 值序列化器
-
序列化/反序列化器
默认:参见上述关于消息序列化/反序列化的讨论
- 物化为
-
使用传入的 KTable 类型时,用于生成状态存储
默认值:
none。 - 缓存已禁用
-
为物化KTable禁用缓存。
当设置为true时,在Materialized对象上调用withCachingDisabled()。
当设置为false时,在Materialized对象上调用withCachingEnabled()。默认值:
false。 - 日志记录已禁用
-
为物化KTable禁用日志记录。当设置为
true时,会在Materialized对象上调用withLoggingDisabled()。默认值:
false。 - 使用原生解码
-
启用/禁用原生解码的标志
默认值:
true。 - 死信队列名称
-
死信队列主题名称。
默定:请参考上面错误处理和DLQ的讨论。
- startOffset
-
如果消费者没有已提交的偏移量可以从中消费,则开始的位置偏移。 这主要用于当消费者第一次从主题中消费时的情况。 Kafka Streams 使用
earliest作为默认策略,绑定器也使用相同的默认值。 可以通过此属性将其覆盖为latest。默认值:
earliest。
注:在消费者中使用 resetOffsets 对 Kafka Streams 绑定器没有任何影响。
与基于消息通道的绑定器不同,Kafka Streams 绑定器不会根据需求寻求开始或结束位置。
- 反序列化异常处理程序
-
反序列化错误处理程序类型。此处理器按消费者绑定应用,而不是前面所述的绑定器级别属性。
可能的值为 -logAndContinue、logAndFail、skipAndContinue或sendToDlq默认值:
logAndFail - 时间戳提取器Bean名称
-
消费者使用的特定时间戳提取器的bean名称。
应用程序可以提供TimestampExtractor作为Spring bean,并且此bean的名称可以提供给消费者,以代替默认值使用。默认值:参见上文关于时间戳提取器的讨论。
- 事件类型
-
此绑定支持的事件类型的逗号分隔列表。
默认值:
none - 事件类型标头键
-
通过此绑定传入的每个记录上的事件类型标题键。
默认值:
event_type - 已消费作为
-
处理器从中消费的源组件的自定义名称。
默认值:
none(由 Kafka Streams 生成)
关于并发的特别说明
在 Kafka Streams 中,您可以使用 num.stream.threads 属性来控制处理器可以创建的线程数量。
您可以通过上述绑定器、函数、生产者或消费者级别的各种 configuration 选项来实现这一点。
您还可以使用核心 Spring Cloud Stream 提供的 concurrency 属性来达到此目的。
使用此属性时,需要将其应用于消费者。
当您有多个输入绑定时,在第一个输入绑定上设置它。
例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,将由绑定器转换为 num.stream.threads。
如果您有多个处理器,并且一个处理器定义了绑定级别并发性,但其他处理器没有,则那些未定义绑定级别并发性的处理器将默认回通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的绑定器范围属性。
如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。