Я использую Spring Batch и Partition для параллельной обработки.Hibernate и Spring Data Jpa для БД.Для шага раздела у читателя, процессора и записывающего устройства есть приставка, и поэтому я могу ввести ключ и диапазон (от-до) к ним.Теперь в процессоре у меня есть один синхронизированный метод, и я ожидал, что этот метод будет запускаться один раз за раз, но это не так.
Я установил 10 разделов, все 10 считывателей Item читают правильный разделенный диапазон,Проблема приходит с предметом процессора.Blow-код имеет ту же логику, которую я использую.
public class accountProcessor implementes ItemProcessor{
@override
public Custom process(item) {
createAccount(item);
return item;
}
//account has unique constraints username, gender, and email
/*
When 1 thread execute that method, it will create 1 account
and save it. If next thread comes in and try to save the same account,
it should find the account created by first thread and do one update.
But now it doesn't happen, instead findIfExist return null
and it try to do another insert of duplicate data
*/
private synchronized void createAccount(item) {
Account account = accountRepo.findIfExist(item.getUsername(), item.getGender(), item.getEmail());
if(account == null) {
//account doesn't exist
account = new Account();
account.setUsername(item.getUsername());
account.setGender(item.getGender());
account.setEmail(item.getEmail());
account.setMoney(10000);
} else {
account.setMoney(account.getMoney()-10);
}
accountRepo.save(account);
}
}
Ожидаемый результат: только один поток будет запускать этот метод в любой момент времени, поэтому не будет повторяющихся вставок в БД, а также будет исключение DataintegrityViolationexception.
На самом деле результатом является то, что второй поток не может найти первую учетную запись и попытаться создать дублирующую учетную запись и сохранить в db, что приведет к DataintegrityViolationexception, ошибке уникальных ограничений.
Поскольку я синхронизировалсяметод, поток должен выполнить его по порядку, второй поток должен дождаться завершения первого потока, а затем запустить его, что означает, что он должен быть в состоянии найти первую учетную запись.
Я пробовал со многими подходами, например, как volatileчтобы он содержал все уникальные учетные записи, сделайте saveAndFlush, чтобы сделать коммиты как можно быстрее, используя threadlocal, ни одна из этих работ.
Нужна помощь.