У меня много транзакционных потребителей с ChainedKafkaTransactionManager
на основе JpaTransactionManager
и KafkaTransactionManager
(все @KafkaListener
).
Для JPA необходимо установить переменную ThreadLocal, чтобы иметь возможность узнать, к какой БД подключаться (это идентификатор клиента).
При запуске приложения в onPartitionsAssigned
listener spring-kafka пытается создать цепочка txn, следовательно, пытается создать JPA txn, но нет набора арендаторов, а затем происходит сбой.
Этот арендатор устанавливается через http-фильтр и / или перехватчики kafka (через заголовки событий).
Я попытался использовать автоматическую KafkaListenerEndpointRegistry
с setAutoStartup(false)
, но я вижу, что потребители не получают никаких событий, возможно, потому что они еще не инициализированы (я думал, что они были инициализированы по требованию).
Если я установлю фиктивный идентификатор арендатора и вызову registry.start()
, когда приложение будет готово, инициализация будет выполняться в других потоках (возможно, потому что я использую ConcurrentKafkaListenerContainerFactory
), поэтому это не так работа.
Есть ли способ избежать транзакции JPA на этот начальный onPartitionsAssigned
слушатель, то есть часть инициализации потребителя?