事务性绑定器

通过将spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix设置为非空值(例如tx-)来启用事务。在处理器应用程序中,消费者启动事务;任何由消费者线程发送的记录都参与同一事务。当监听器正常退出时,监听器容器会向事务发送偏移量并提交该事务。使用公共生产者工厂用于所有使用spring.cloud.stream.kafka.binder.transaction.producer.*属性配置的生产者绑定;忽略单个绑定Kafka生产者属性。spring-doc.cadn.net.cn

普通绑定器重试(以及死信)在事务中不受支持,因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也会被回滚。当启用重试时(常见属性maxAttempts大于零),将使用重试属性来配置DefaultAfterRollbackProcessor以在容器级别启用重试。同样地,不是在事务内发布死信记录,而是通过DefaultAfterRollbackProcessor将此功能移至监听器容器,在主事务回滚之后运行。

如果要在源应用程序中使用事务,或从任意线程为仅生产者事务(例如 @Scheduled 方法)使用事务,则必须获取事务性生产者工厂的引用,并使用它定义一个 KafkaTransactionManager bean。spring-doc.cadn.net.cn

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用BinderFactory获取绑定程序的引用;当只有一个绑定程序配置时,在第一个参数中使用null
如果配置了多个绑定程序,请使用绑定程序名称来获取引用。
一旦我们有了绑定程序的引用,就可以获得ProducerFactory的引用并创建一个事务管理器。spring-doc.cadn.net.cn

然后,您将使用正常的Spring事务支持,例如TransactionTemplate@Transactional,例如:
spring-doc.cadn.net.cn

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望将仅生产者事务与来自其他某些事务管理器的事务同步,请使用ChainedTransactionManagerspring-doc.cadn.net.cn

如果您部署了应用程序的多个实例,每个实例都需要一个唯一的transactionIdPrefix

Kafka事务中的异常重试行为

配置事务回滚重试行为

在 Kafka 事务中处理消息时,您可以使用 defaultRetryable 属性和 retryableExceptions 映射来配置哪些异常应在事务回滚后重试。spring-doc.cadn.net.cn

默认重试行为

spring-doc.cadn.net.cn

DefaultAfterRollbackProcessor 确定在事务回滚后触发重试的异常。spring-doc.cadn.net.cn

默认情况下,所有异常都将被重试,但您可以修改此行为:spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             defaultRetryable: false  # Change default to NOT retry exceptions

defaultRetryable被设置为false时,DefaultAfterRollbackProcessor将配置为defaultFalse(true),这意味着除非显式地配置为可重试的异常,否则不会重试异常。spring-doc.cadn.net.cn

特定于异常的配置

对于细粒度控制,您可以为各个异常类型指定重试行为:spring-doc.cadn.net.cn

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             retryableExceptions:
               java.lang.IllegalStateException: true    # Always retry this exception
               java.lang.IllegalArgumentException: false  # Never retry this exception

当异常标记为true时,DefaultAfterRollbackProcessor将使用addRetryableExceptions();当异常标记为false时,DefaultAfterRollbackProcessor将使用addNotRetryableExceptions()。这些针对特定异常的配置优先于默认行为。spring-doc.cadn.net.cn

实施细节