Это можно сделать, но это немного сложно, потому что вы должны отправить потребленные смещения в транзакцию Kafka.
Вместо использования ChainedKafkaTransactionManager
, вы можете использовать KafkaTransactionManager
для контейнера и @Transactional
для HibernateTransactionManager
.
Это даст аналогичные результаты, поскольку гибернационный TX будет фиксироваться непосредственно перед транзакцией Kafka (и, если фиксация hibernate завершится неудачно, Kafka TX откатится ).
РЕДАКТИРОВАТЬ
Чтобы настроить различные цепочки TM для каждого контейнера слушателя, вы можете сделать что-то вроде этого.
@ Класс компонента ContainerFactoryCustomizer {
ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
ChainedKafkaTransactionManager<?, ?> chainedOne,
ChainedKafkaTransactionManager<?, ?> chainedTwo) {
factory.setContainerCustomizer(
container -> {
String groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("foo")) {
container.getContainerProperties().setTransactionManager(chainedOne);
}
else {
container.getContainerProperties().setTransactionManager(chainedTwo);
}
});
}
}
Where each chained TM has a Hibernate TM with a different default timeout.
The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.