Синхронизация транзакций Spring не работает (TransactionalEventListener) - PullRequest
1 голос
/ 05 августа 2020

Я знаю, что этот вопрос задавался на этом сайте в несколько ином формате, но следование советам, данным в этих сообщениях, ни к чему не привело. Я уже потратил на это около двух дней, и у меня нет идей.

У нас есть микросервис весенней загрузки, который не делает ничего, кроме того, что прислушивается к сообщению, поступающему в очередь IBM MQ, делает небольшое преобразование и отправив его на топи Kafka c. Мы хотим, чтобы это было транзакционным, чтобы не было потерянных сообщений (критично для нашего бизнеса). Мы также хотим иметь возможность реагировать на события фиксации и отката транзакции с целью мониторинга и поддержки.

Я просто выполнил несколько инструкций в inte rnet, и я могу легко достичь транзакционного поведение декларативно с использованием аннотации @Transactional, как показано ниже:

@Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "DEV.QUEUE.1", containerFactory = "mqListenerContainerFactory", concurrency = "10")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
    // Some work here including forward to Kafka topic:
    // ...
    // ...

    // Then publish an event which is supposed to be acted on:
    applicationEventPublisher.publishEvent(new MqConsumedEvent("JMS Correlation ID", "Message Payload"));

    // Uncommented exception below to create a rollback scenario
    // or comment it out to have the processing completed
    throw new RuntimeException("No good Pal!");
}

Как и ожидалось, при воспроизведении сообщения с исключением на месте обработка будет вращаться вечно из-за того, что диспетчер транзакций откатывается снова и снова. Это хорошо для нас.

Теперь мы ожидаем, что MqConsumedEvent, публикуемое внутри нашего метода прослушивателя, будет перехвачено onRollback методом ниже:

@Component
@Slf4j
public class MqConsumedEventListener {
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = MqConsumedEvent.class)
    public void onCommit(MqConsumedEvent event) {
        log.info("MQ message with correlation id {} committed to Kafka", event.getCorrelationId());
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, classes = MqConsumedEvent.class)
    public void onRollback(MqConsumedEvent event) {
        log.info("Failed to commit MQ message with correlation id {} to Kafka", event.getCorrelationId());
    }
}

Этого не происходит. Подобное комментирование исключения исключения в слушателе заставляет наше сообщение MQ передаваться в Kafka. Однако метод onCommit не выполняется.

Из дальнейших исследований и отладки Spring я считаю, что это не выполняется, потому что spring думает, что при публикации события нет активной транзакции, и такое мое событие просто игнорируется. Оценка TransactionSynchronizationManager.isActualTransactionActive() и печать его в журналах показывает false, что трудно объяснить, потому что, как я уже сказал, откаты транзакций, как и ожидалось, когда исключение выбрасывается намеренно.

Заранее благодарим вас за ваш вклад.

ОБНОВЛЕНИЕ:

Установленные мной точки останова привели меня к выполнению этого ApplicationListenerMethodTransactionalAdapter класса:

@Override
public void onApplicationEvent(ApplicationEvent event) {
    if (TransactionSynchronizationManager.isSynchronizationActive() &&
            TransactionSynchronizationManager.isActualTransactionActive()) {
        TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
        TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
    }
    else if (this.annotation.fallbackExecution()) {
        if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
            logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
        }
        processEvent(event);
    }
    else {
        // No transactional event execution at all
        if (logger.isDebugEnabled()) {
            logger.debug("No transaction is active - skipping " + event);
        }
    }
}

По какой-то причине я не понимание первого, если условие ложно. Тогда резервное выполнение будет false, поскольку я не установил его true в моем @TransactionalEventListener использовании, он закончится в ветке else и просто пропустит событие.

...