|
这个版本仍在开发中,尚未达到稳定状态。要使用最新稳定版,请使用 spring-cloud-stream 5.0.1 ! |
互动查询
Kafka Streams 绑定器 API 暴露了一个名为 InteractiveQueryService 的类,用于与状态存储进行交互式查询。您可以在应用程序中将其作为 Spring Bean 访问。从应用程序中获取此 Bean 的简单方法是 autowire 这个 Bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,上述调用retrieve方法从存储中获取数据时可能会失败。例如,它可能仍处于中间状态初始化过程。 在这种情况下,重试此操作会很有用。Kafka Streams绑定器提供了一个简单的重试机制来满足这种需求。
您可使用以下两个属性来控制重试行为。
-
默认值为 1。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认是
1000毫秒。
如果运行了多个 Kafka 流应用程序实例,那么在可以对其进行交互式查询之前,需要确定哪个应用实例托管要查询的特定密钥。
0 API 提供了用于确定主机信息的方法。
在进行此操作之前,必须按如下所示配置属性application.server:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
这里有一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
关于这些主机查找方法的更多信息,请参阅这些方法的Javadoc。 对于这些方法,在启动期间,如果底层KafkaStreams对象尚未准备就绪,可能会抛出异常。 前述重试属性也适用于这些方法。
可通过交互式查询服务使用的其他API方法
使用以下 API 方法来检索与给定商店和键组合相关的 KeyQueryMetadata 对象。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法来检索与给定商店和键组合相关的 KakfaStreams 对象。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
定制商店查询参数
有时,在通过InteractiveQueryService查询存储之前,需要调整存储查询参数。为此,从绑定器的4.0.1版本开始,您可以提供一个StoreQueryParametersCustomizer,它是一个具有方法customize的函数式接口,该方法采用StoreQueryParameter作为参数。 这是它的方法签名。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
使用这种方法,应用程序可以进一步自定义StoreQueryParameters,例如启用旧存储。
当该 Bean 在此应用程序中存在时,InteractiveQueryService 将在查询状态存储之前调用其 customize 方法。
请记住,应用程序中必须有一个唯一的StoreQueryParametersCustomizer类型的bean可用。 |