Я знаю, что этот вопрос задавался на этом сайте в несколько ином формате, но следование советам, данным в этих сообщениях, ни к чему не привело. Я уже потратил на это около двух дней, и у меня нет идей.
У нас есть микросервис весенней загрузки, который не делает ничего, кроме того, что прислушивается к сообщению, поступающему в очередь IBM MQ, делает небольшое преобразование и отправив его на топи Kafka c. Мы хотим, чтобы это было транзакционным, чтобы не было потерянных сообщений (критично для нашего бизнеса). Мы также хотим иметь возможность реагировать на события фиксации и отката транзакции с целью мониторинга и поддержки.
Я просто выполнил несколько инструкций в inte rnet, и я могу легко достичь транзакционного поведение декларативно с использованием аннотации @Transactional
, как показано ниже:
@Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "DEV.QUEUE.1", containerFactory = "mqListenerContainerFactory", concurrency = "10")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
// Some work here including forward to Kafka topic:
// ...
// ...
// Then publish an event which is supposed to be acted on:
applicationEventPublisher.publishEvent(new MqConsumedEvent("JMS Correlation ID", "Message Payload"));
// Uncommented exception below to create a rollback scenario
// or comment it out to have the processing completed
throw new RuntimeException("No good Pal!");
}
Как и ожидалось, при воспроизведении сообщения с исключением на месте обработка будет вращаться вечно из-за того, что диспетчер транзакций откатывается снова и снова. Это хорошо для нас.
Теперь мы ожидаем, что MqConsumedEvent, публикуемое внутри нашего метода прослушивателя, будет перехвачено onRollback
методом ниже:
@Component
@Slf4j
public class MqConsumedEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = MqConsumedEvent.class)
public void onCommit(MqConsumedEvent event) {
log.info("MQ message with correlation id {} committed to Kafka", event.getCorrelationId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, classes = MqConsumedEvent.class)
public void onRollback(MqConsumedEvent event) {
log.info("Failed to commit MQ message with correlation id {} to Kafka", event.getCorrelationId());
}
}
Этого не происходит. Подобное комментирование исключения исключения в слушателе заставляет наше сообщение MQ передаваться в Kafka. Однако метод onCommit
не выполняется.
Из дальнейших исследований и отладки Spring я считаю, что это не выполняется, потому что spring думает, что при публикации события нет активной транзакции, и такое мое событие просто игнорируется. Оценка TransactionSynchronizationManager.isActualTransactionActive()
и печать его в журналах показывает false
, что трудно объяснить, потому что, как я уже сказал, откаты транзакций, как и ожидалось, когда исключение выбрасывается намеренно.
Заранее благодарим вас за ваш вклад.
ОБНОВЛЕНИЕ:
Установленные мной точки останова привели меня к выполнению этого ApplicationListenerMethodTransactionalAdapter
класса:
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
По какой-то причине я не понимание первого, если условие ложно. Тогда резервное выполнение будет false
, поскольку я не установил его true
в моем @TransactionalEventListener
использовании, он закончится в ветке else и просто пропустит событие.