Как использовать @Transactional с @KafkaListener? - PullRequest
2 голосов
/ 10 апреля 2020

Есть ли возможность использовать декларативное управление tx (через @Transactional) с аннотированным методом @KafkaListener? Я хотел бы использовать его для того, чтобы, например, определить отдельный тайм-аут для каждого слушателя. Моя установка выглядит следующим образом:

TransactionManager:

@Bean
        @ConditionalOnBean(value = {HibernateTransactionManager.class})
        public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
                                                                                 org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
            return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, hibernateTransactionManager);
        }

KafkaListener:

@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}

Проблема в том, что - KafkaMessageListenerContainer создает свою собственную транзакцию до вызова такого метода - она использует свой собственный шаблон TransactionTemplate:

@Nullable
        private TransactionTemplate determineTransactionTemplate() {
            return this.transactionManager != null
                    ? new TransactionTemplate(this.transactionManager)
                    : null;
        }

TransactionInterceptor не используется. Так как установить заданный тайм-аут c tx для конкретного метода @KafkaListener?

Ответы [ 2 ]

1 голос
/ 10 апреля 2020

Это можно сделать, но это немного сложно, потому что вы должны отправить потребленные смещения в транзакцию Kafka.

Вместо использования ChainedKafkaTransactionManager, вы можете использовать KafkaTransactionManager для контейнера и @Transactional для HibernateTransactionManager.

Это даст аналогичные результаты, поскольку гибернационный TX будет фиксироваться непосредственно перед транзакцией Kafka (и, если фиксация hibernate завершится неудачно, Kafka TX откатится ).

РЕДАКТИРОВАТЬ

Чтобы настроить различные цепочки TM для каждого контейнера слушателя, вы можете сделать что-то вроде этого.

@ Класс компонента ContainerFactoryCustomizer {

ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
        ChainedKafkaTransactionManager<?, ?> chainedOne,
        ChainedKafkaTransactionManager<?, ?> chainedTwo) {
    factory.setContainerCustomizer(
            container -> {
                String groupId = container.getContainerProperties().getGroupId();
                if (groupId.equals("foo")) {
                    container.getContainerProperties().setTransactionManager(chainedOne);
                }
                else {
                    container.getContainerProperties().setTransactionManager(chainedTwo);
                }
            });
}

}


Where each chained TM has a Hibernate TM with a different default timeout.

The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.
0 голосов
/ 10 апреля 2020

Аспекты управления транзакциями через обычный @Transactional не работают с прослушивателями Kafka, как вы и предполагали - встроенного взаимодействия нет.

Существует отдельная глава о том, как использовать события приложения в транзакции: Событие, связанное с транзакцией с использованием аннотации @TransactionalEventListener помогает убедиться, что транзакция завершена успешно.

Вы можете зарегистрировать обычный прослушиватель событий с помощью аннотации @EventListener. Если вам нужно привязать его к транзакции, используйте @TransactionalEventListener. Когда вы делаете это, слушатель по умолчанию привязывается к фазе фиксации транзакции.

...