Я пытаюсь настроить ConcurrentKafkaListenerContainerFactory следующим образом:
Если ошибка (например, IllegalArgumentException) вызывает откат в транзакции, которая обрабатывает определенное сообщение, будет другая попытка обработать то же сообщение.
Если следующие попытки обработать это сообщение продолжают вызывать ошибки и вызывать откат в транзакции, это сообщение должно быть отправлено в новый topi c (DLT). снова обрабатывается.
Проблема в том, что это сообщение всегда отправляется DLT, помеченному как «откат». Поэтому я могу только читать эти сообщения из DLT, когда используется уровень изоляции = read_uncommitted. Это ожидаемое поведение? Можно ли изменить конфигурацию так, чтобы сообщения отправлялись в DLT как зафиксированные?
Если это поведение невозможно изменить, то возможно ли настроить только указанный c прослушиватель DLT (но не другой слушатели в том же приложении) с уровнем изоляции: read_uncomited?
Это моя конфигурация: spring-boot (2.2.5) spring-kafka (2.4.3)
application.properties
spring.kafka.producer.acks=all
spring.kafka.listener.concurrency=1
spring.kafka.listener.ack-mode=RECORD
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed
Spring Конфигурация
@Bean
public JpaTransactionManager transactionManager() {
return new JpaTransactionManager();
}
@Inject
private ProducerFactory<?, ?> producerFactory;
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
@Bean
public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
KafkaTransactionManager<?, ?> kafka) {
kafka.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
kafka.setTransactionIdPrefix("kafka-tx-");
jpa.setRollbackOnCommitFailure(true);
jpa.setNestedTransactionAllowed(true);
return new ChainedKafkaTransactionManager<>(kafka, jpa);
}
@Component
class ContainerFactoryConfigurer {
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
ChainedKafkaTransactionManager<?, ?> tm,
KafkaTemplate<Object, Object> template) {
factory.getContainerProperties().setTransactionManager(tm);
DefaultAfterRollbackProcessor rollbackProcessor = new DefaultAfterRollbackProcessor((record, exception) -> {}, new FixedBackOff(0L, 1L));
rollbackProcessor.setCommitRecovered(true);
rollbackProcessor.setKafkaTemplate(template);
factory.setAfterRollbackProcessor(rollbackProcessor);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 1L));
errorHandler.setCommitRecovered(true);
errorHandler.setAckAfterHandle(true);
factory.setErrorHandler(errorHandler);
factory.getContainerProperties().setAckOnError(false);
}
}