Сообщения, отправленные DeadLetterPublishingRecoverer после того, как исключение UnexpectedRollbackException не зафиксировано - PullRequest
0 голосов
/ 05 марта 2020

Я пытаюсь настроить ConcurrentKafkaListenerContainerFactory следующим образом:

  • Если ошибка (например, IllegalArgumentException) вызывает откат в транзакции, которая обрабатывает определенное сообщение, будет другая попытка обработать то же сообщение.

  • Если следующие попытки обработать это сообщение продолжают вызывать ошибки и вызывать откат в транзакции, это сообщение должно быть отправлено в новый topi c (DLT). снова обрабатывается.

Проблема в том, что это сообщение всегда отправляется DLT, помеченному как «откат». Поэтому я могу только читать эти сообщения из DLT, когда используется уровень изоляции = read_uncommitted. Это ожидаемое поведение? Можно ли изменить конфигурацию так, чтобы сообщения отправлялись в DLT как зафиксированные?

Если это поведение невозможно изменить, то возможно ли настроить только указанный c прослушиватель DLT (но не другой слушатели в том же приложении) с уровнем изоляции: read_uncomited?

Это моя конфигурация: spring-boot (2.2.5) spring-kafka (2.4.3)

application.properties
spring.kafka.producer.acks=all
spring.kafka.listener.concurrency=1
spring.kafka.listener.ack-mode=RECORD
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed

Spring Конфигурация

        @Bean
        public JpaTransactionManager transactionManager() {
            return new JpaTransactionManager();
        }

        @Inject
        private ProducerFactory<?, ?> producerFactory;

        @Bean
        public KafkaTransactionManager kafkaTransactionManager() {
            KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
            ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
            return ktm;
        }

        @Bean
        public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
                                                                KafkaTransactionManager<?, ?> kafka) {

            kafka.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
            kafka.setTransactionIdPrefix("kafka-tx-");
            jpa.setRollbackOnCommitFailure(true);
            jpa.setNestedTransactionAllowed(true);
            return new ChainedKafkaTransactionManager<>(kafka, jpa);
        }


    @Component
    class ContainerFactoryConfigurer {

        ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
                                    ChainedKafkaTransactionManager<?, ?> tm,
                                    KafkaTemplate<Object, Object> template) {

            factory.getContainerProperties().setTransactionManager(tm);
            DefaultAfterRollbackProcessor rollbackProcessor = new DefaultAfterRollbackProcessor((record, exception) -> {}, new FixedBackOff(0L, 1L));
            rollbackProcessor.setCommitRecovered(true);
            rollbackProcessor.setKafkaTemplate(template);
            factory.setAfterRollbackProcessor(rollbackProcessor);
            SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
                    new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 1L));
            errorHandler.setCommitRecovered(true);
            errorHandler.setAckAfterHandle(true);
            factory.setErrorHandler(errorHandler);           
            factory.getContainerProperties().setAckOnError(false);
        }
    }

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...