此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-stream 4.3.0! |
事务性活页夹
通过设置启用事务spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
设置为非空值,例如tx-
.
当在处理器应用程序中使用时,消费者启动事务;在使用者线程上发送的任何记录都参与同一事务。
当监听器正常退出时,监听器容器会将偏移量发送给事务并提交。
一个通用的生产者工厂用于使用spring.cloud.stream.kafka.binder.transaction.producer.*
性能;忽略单个绑定 Kafka 生产者属性。
事务不支持正常的活页夹重试(和死信),因为重试将在原始事务中运行,该事务可能会回滚,任何已发布的记录也将回滚。
启用重试时(公共属性maxAttempts 大于零),重试属性用于配置DefaultAfterRollbackProcessor 以启用容器级别的重试。
同样,此功能不是在事务中发布死信记录,而是通过DefaultAfterRollbackProcessor 在主事务回滚后运行。 |
如果您希望在源应用程序中使用事务,或者从某个任意线程中用于仅生产者事务(例如@Scheduled
方法),您必须获取对事务性生产者工厂的引用,并定义一个KafkaTransactionManager
bean 使用它。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
请注意,我们使用BinderFactory
;用null
在第一个参数中,当只有一个 Binder 配置时。
如果配置了多个活页夹,请使用活页夹名称获取引用。
一旦我们有了对 binder 的引用,我们就可以获得对ProducerFactory
并创建事务管理器。
然后您将使用正常的 Spring 事务支持,例如TransactionTemplate
或@Transactional
例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅生产者事务与其他事务管理器中的事务同步,请使用ChainedTransactionManager
.
如果您部署应用程序的多个实例,则每个实例都需要一个唯一的transactionIdPrefix . |
Kafka 事务中的异常重试行为
默认重试行为
这DefaultAfterRollbackProcessor
确定哪些异常在事务回滚后触发重试。
默认情况下,将重试所有异常,但您可以修改此行为:
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
defaultRetryable: false # Change default to NOT retry exceptions
什么时候defaultRetryable
设置为false
这DefaultAfterRollbackProcessor
将配置为defaultFalse(true)
,这意味着除非显式配置为可重试,否则不会重试异常。
特定于异常的配置
对于细粒度控制,可以为各个异常类型指定重试行为:
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
retryableExceptions:
java.lang.IllegalStateException: true # Always retry this exception
java.lang.IllegalArgumentException: false # Never retry this exception
这DefaultAfterRollbackProcessor
将使用addRetryableExceptions()
对于标记为true
和addNotRetryableExceptions()
对于标记为false
.
这些特定于异常的配置优先于默认行为。