я запускаю приложение весенней загрузки с базой данных mysql и kafka в качестве службы обмена сообщениями (выполнял синхронизацию транзакций с использованием chainedKafkaTransactionManager для kafka и mysql) для некоторых асинхронных операций.
когда несколько сообщений поступают в слушатель kafka иногдастарые данные, поступающие из базы данных вместо ранее переданных данных.
Я использую хранилище crud, и этот случай происходит только для нескольких сообщений одновременно
пример: обновление одного объектаЧЕЛОВЕК, имеющий имя и идентификатор;
1-е сообщение получит объект по идентификатору и обновит имя как SAM.
2-е сообщение получит объект и обновит имя как REGO
3-е сообщение получит объектзатем, если я проверяю данные, которые он содержит SAM в качестве имени, но в базе данных он имеет REGO.
Я попытался добавить свойство изоляции как чтение совершено в транзакции, но не повезло
// listeners
@Autowired
private PersonRepository personRepository;
@Autowired
private AddressRepository addressRepository;
@KafkaListener(id = "update_name", topics = "update_name")
@Transactional(readOnly = false)
public void updateName(PersonModel personModel) {
Person person = personRepository.findById(personModel.getId());
log.info("before -> name which is in database : " + person.getName());
person.setName(personModel.getName());
person = personRepository.save(person);
log.info("after-> name which is in database : " + person.getName());
}
@KafkaListener(id = "update_name_and_address", topics = "update_name_and_address")
@Transactional(readOnly = false)
public void updateNameAndAddress(PersonModel personModel) {
Address addr= addressRepository.findById(personModel.getAddresId);
addr.setPlace(personModel.getPlace());
addressRepository.save(addr);
updateName(personModel);
}
// repository
public interface PersonRepository extends CrudRepository<Person , Integer> {
}
Мне нужно последнееданные из базы данных