Неожиданно в разделе Spring при использовании синхронизированных - PullRequest
0 голосов
/ 13 февраля 2019

Я использую Spring Batch и Partition для параллельной обработки.Hibernate и Spring Data Jpa для БД.Для шага раздела у читателя, процессора и записывающего устройства есть приставка, и поэтому я могу ввести ключ и диапазон (от-до) к ним.Теперь в процессоре у меня есть один синхронизированный метод, и я ожидал, что этот метод будет запускаться один раз за раз, но это не так.

Я установил 10 разделов, все 10 считывателей Item читают правильный разделенный диапазон,Проблема приходит с предметом процессора.Blow-код имеет ту же логику, которую я использую.

public class accountProcessor implementes ItemProcessor{
    @override
    public Custom process(item) {
        createAccount(item);
        return item;
    }

    //account has unique constraints username, gender, and email
    /*
        When 1 thread execute that method, it will create 1 account 
        and save it. If next thread comes in and  try to save the  same  account, 
        it  should find the account created by first thread and do one update. 
        But now it doesn't happen, instead findIfExist return null 
        and it  try to do another insert of duplicate data
    */
    private synchronized void createAccount(item) {
        Account account = accountRepo.findIfExist(item.getUsername(),  item.getGender(),  item.getEmail());
        if(account  == null) {
            //account  doesn't  exist
            account = new Account();
            account.setUsername(item.getUsername());
            account.setGender(item.getGender());
            account.setEmail(item.getEmail());
            account.setMoney(10000);
        } else {
            account.setMoney(account.getMoney()-10);
        }
        accountRepo.save(account);
    }
}

Ожидаемый результат: только один поток будет запускать этот метод в любой момент времени, поэтому не будет повторяющихся вставок в БД, а также будет исключение DataintegrityViolationexception.

На самом деле результатом является то, что второй поток не может найти первую учетную запись и попытаться создать дублирующую учетную запись и сохранить в db, что приведет к DataintegrityViolationexception, ошибке уникальных ограничений.

Поскольку я синхронизировалсяметод, поток должен выполнить его по порядку, второй поток должен дождаться завершения первого потока, а затем запустить его, что означает, что он должен быть в состоянии найти первую учетную запись.

Я пробовал со многими подходами, например, как volatileчтобы он содержал все уникальные учетные записи, сделайте saveAndFlush, чтобы сделать коммиты как можно быстрее, используя threadlocal, ни одна из этих работ.

Нужна помощь.

1 Ответ

0 голосов
/ 13 февраля 2019

Поскольку вы сделали шаговую область обработчика элементов, вам не требуется синхронизация, поскольку у каждого шага будет свой экземпляр процессора.

Но похоже, что у вас проблема с дизайном, а невопрос реализации.Вы пытаетесь синхронизировать потоки в параллельном режиме в определенном порядке.Когда вы решите пойти параллельно и разделить данные на разделы и дать каждому работнику (локальному или удаленному) раздел для работы, вы должны признать, что эти разделы будут обрабатываться в неопределенном порядке и что между записями не должно быть никакой связи.каждого раздела или между работой, выполненной каждым работником.

Когда 1 поток выполнит этот метод, он создаст 1 учетную запись и сохранит ее.Если следующий поток заходит и пытается сохранить ту же учетную запись, он должен найти учетную запись, созданную первым потоком, и выполнить одно обновление.Но теперь этого не происходит, вместо этого findIfExist возвращает null и пытается выполнить еще одну вставку дубликатов данных

Это потому, что транзакция thread1 может быть еще не зафиксирована, поэтому thread2 не найдетзапись, которая, по вашему мнению, была добавлена ​​темой 1.

Похоже, вы пытаетесь создать или обновить некоторые учетные записи с помощью многораздельной настройки.Я не уверен, подходит ли эта установка для рассматриваемой проблемы.

В качестве примечания, я бы не назвал accountRepo.save(account); в процессоре элементов, а скорее сделал бы это в модуле записи элементов.

Надеюсь, это поможет.

...