渠道结果发送程序

从版本 4.0.3 开始,您可以配置 resultMetadataChannel 来接收 SenderResult<?> 以确定发送的成功/失败。spring-doc.cadn.net.cn

SenderResult 包含 correlationMetadata,用于将结果与发送相关联;它还包含 RecordMetadata,指示已发送记录的 TopicPartition 和偏移量。spring-doc.cadn.net.cn

代码resultMetadataChannel必须是FluxMessageChannel实例。必须spring-doc.cadn.net.cn

以下是使用此功能的示例,其中相关联的元数据类型为Integerspring-doc.cadn.net.cn

@Bean
FluxMessageChannel sendResults() {
    return new FluxMessageChannel();
}

@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
    if (result.exception() != null) {
        failureFor(result);
    }
    else {
        successFor(result);
    }
}

要将关联元数据设置到输出记录上,请设置CORRELATION_ID头:spring-doc.cadn.net.cn

streamBridge.send("words1", MessageBuilder.withPayload("foobar")
        .setCorrelationId(42)
        .build());

使用该功能时,如果参数为Function,则函数输出类型必须是Message<?>,并且关联ID头部需要设置为目标值。spring-doc.cadn.net.cn

元数据应是唯一的,至少在发送期间如此。spring-doc.cadn.net.cn