Присвоение разделов и ChainedKafkaTransactionManager при запуске с JPA - PullRequest
1 голос
/ 29 января 2020

У меня много транзакционных потребителей с ChainedKafkaTransactionManager на основе JpaTransactionManager и KafkaTransactionManager (все @KafkaListener).

Для JPA необходимо установить переменную ThreadLocal, чтобы иметь возможность узнать, к какой БД подключаться (это идентификатор клиента).

При запуске приложения в onPartitionsAssigned listener spring-kafka пытается создать цепочка txn, следовательно, пытается создать JPA txn, но нет набора арендаторов, а затем происходит сбой.

Этот арендатор устанавливается через http-фильтр и / или перехватчики kafka (через заголовки событий).

Я попытался использовать автоматическую KafkaListenerEndpointRegistry с setAutoStartup(false), но я вижу, что потребители не получают никаких событий, возможно, потому что они еще не инициализированы (я думал, что они были инициализированы по требованию).

Если я установлю фиктивный идентификатор арендатора и вызову registry.start(), когда приложение будет готово, инициализация будет выполняться в других потоках (возможно, потому что я использую ConcurrentKafkaListenerContainerFactory), поэтому это не так работа.

Есть ли способ избежать транзакции JPA на этот начальный onPartitionsAssigned слушатель, то есть часть инициализации потребителя?

1 Ответ

1 голос
/ 29 января 2020

Если у вашей TM-цепочки сначала есть KafkaTM, а затем JPA-TM (что было бы нормальным случаем), вы можете добиться аналогичной функциональности, просто введя Kafka-TM в контейнер и используя @Transactional (только с JPA-TM). на слушателе) для запуска транзакции JPA при вызове слушателя.

Время между фиксациями транзакции будет незначительно увеличено, но это обеспечит аналогичную функциональность.

Если это не сработает для вас откройте вопрос GitHub; мы можем либо отключить начальную фиксацию при назначении, либо сделать это вообще без транзакции (опционально).

...