此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0spring-doc.cadn.net.cn

使用 Spring Cloud Sleuth 进行跟踪

当 Spring Cloud Sleuth 位于基于 Spring Cloud Stream Kafka Streams 绑定程序的应用程序的类路径上时,其使用者和生产者都会自动检测跟踪信息。 但是,为了跟踪任何特定于应用程序的作,需要由用户代码显式检测这些作。 这可以通过注入KafkaStreamsTracingbean 来自应用程序中的 Spring Cloud Sleuth,然后通过这个注入的 bean 调用各种 Kafka Streams作。 以下是一些使用它的示例。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
                LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
                return new KeyValue<>(value.getRegion(),
                        value.getClicks());
            }))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum, Materialized.as(CLICK_UPDATES))
            .toStream());
}

在上面的示例中,有两个地方添加了显式跟踪检测。 首先,我们记录传入的键/值信息KStream. 记录此信息时,也会记录关联的跨度和跟踪 ID,以便监控系统可以跟踪它们并与同一跨度 ID 相关联。 其次,当我们调用map作,而不是直接在KStream类,我们将其包装在一个transform作,然后调用mapKafkaStreamsTracing. 在这种情况下,记录的消息也将包含跨度 ID 和跟踪 ID。spring-doc.cadn.net.cn

这是另一个示例,我们使用低级转换器 API 来访问各种 Kafka Streams 标头。 当 spring-cloud-sleuth 在类路径上时,也可以像这样访问所有跟踪标头。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
    return input -> input.transform(kafkaStreamsTracing.transformer(
            "transformer-1",
            () -> new Transformer<String, String, KeyValue<String, String>>() {
                ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    LOG.info("Headers: " + this.context.headers());
                    LOG.info("K/V:" + key + "/" + value);
                    // More transformations, business logic execution, etc. go here.
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                }
            }));
}