对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
交互式查询
Kafka Streams 绑定器 API 公开了一个名为InteractiveQueryService
以交互方式查询状态存储。
您可以在应用程序中以 Spring Bean 的形式访问它。从应用程序访问此 Bean 的一种简单方法是autowire
豆子。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦您获得了对此 bean 的访问权限,您就可以查询您感兴趣的特定状态存储。见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,上述检索存储的方法调用可能会失败。 例如,它可能仍在初始化状态存储的过程中。 在这种情况下,重试此作会很有用。 Kafka Streams 绑定器提供了一个简单的重试机制来适应这一点。
下面是可用于控制此重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为
1
. -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为
1000
毫秒。
如果有多个 kafka 流应用程序实例正在运行,那么在以交互方式查询它们之前,您需要确定哪个应用程序实例托管了您要查询的特定密钥。InteractiveQueryService
API 提供了用于识别主机信息的方法。
为了使其正常工作,您必须配置属性application.server
如下:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有关这些主机查找方法的更多信息,请参阅有关这些方法的 Javadoc。 对于这些方法,在启动期间,如果底层 KafkaStreams 对象尚未准备就绪,它们可能会抛出异常。 上述重试属性也适用于这些方法。
通过 InteractiveQueryService 提供的其他 API 方法
使用以下 API 方法检索KeyQueryMetadata
与给定存储和键的组合相关联的对象。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法检索KakfaStreams
与给定存储和键的组合相关联的对象。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
自定义商店查询参数
有时需要先微调 store 查询参数,然后才能通过InteractiveQueryService
.
为此,从4.0.1
版本的 Binder 中,你可以为StoreQueryParametersCustomizer
这是一个功能接口,具有customize
采用StoreQueryParameter
作为论据。
这是它的方法签名。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
使用此方法,应用程序可以进一步自定义StoreQueryParameters
例如启用陈旧的商店。
当此 bean 存在于此应用程序中时,InteractiveQueryService
会调用其customize
方法。
请记住,必须有一个唯一的 beanStoreQueryParametersCustomizer 在应用程序中可用。 |