|
对于最新稳定版本,请使用spring-cloud-stream 5.0.1! |
使用Spring CloudSleuth跟踪
当Spring Cloud Sleuth位于类路径中时,基于Spring Cloud Stream Kafka Streams绑定器的应用程序的消费者和生产者都会自动使用跟踪信息进行仪器记录。但是,为了对任何应用程序特定的操作进行跟踪,需要由用户代码显式地对其进行仪器记录。这可以通过在应用程序中注入Spring Cloud Sleuth的0 bean来完成,然后通过此注入的bean调用各种Kafka Streams操作。下面是使用它的几个示例。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
在上面的示例中,有两个地方添加了显式跟踪仪器。首先,我们记录传入的KStream中的键/值信息。当记录这些信息时,相关的跨度和跟踪ID也会被记录下来,这样监控系统就可以跟踪它们并与此相同跨度相关联。其次,当我们调用一个map操作时,我们不是直接在KStream类上进行调用,而是将其包装在一个transform操作中,然后从KafkaStreamsTracing调用map。在这种情况下,日志消息也将包含跨度和跟踪ID。
这是另一个示例,我们使用低级转换器 API 访问各种 Kafka 流标题。</p> <p>如果类路径上有 spring-cloud-sleuth,还可以像这样访问所有跟踪标头。
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}