Одновременное сохранение с использованием JpaRepository - PullRequest
0 голосов
/ 11 июля 2019

Я работаю над микросервисом, который обслуживает конечные точки REST для сохранения / извлечения данных в / из базы данных, используя данные пружины.

Позволяет вызывать класс сущностей Foo, который имеет простое Longдля его поля ID некоторые другие поля данных.Идентификаторы для каждого Foo не генерируются автоматически в этой службе, они предоставляются из внешнего источника, который знает, как сделать их уникальными.

Служба имеет одну конечную точку POST, которая выполняет функции создания и обновления:модель CRUD, которая вызывает соответствующую функцию на служебном уровне кода, назовем эту функцию AddData(Foo foo_incoming).Тело сообщения POST содержит данные для сохранения в базе данных и идентификатор Foo для сохранения данных.Логика AddData выглядит следующим образом:

@Service("fooService")
public class FooServiceImpl {

    @Autowired
    FooRepository fooRepository; // Subinterface of JpaRepository

    @Transactional
    public Long AddData(Foo foo_incoming) {
        Optional<Foo> foo_check = fooRepository.findById(incoming.getId());
        Foo foo_exists;

        // Exists already?
        if (foo_check.isEmpty()) {
            // New Foo
            foo_exists = fooRepository.saveAndFlush(foo_incoming);
        } else {
            // Update existing foo
            foo_exists = foo_check.get();
            foo_exists.addToFieldA(foo_incoming.getFieldA());
            foo_exists.addToFieldB(foo_incoming.getFieldB());
        }

        return foo_exists.getId();
    }

}

Эта одна функция отвечает как за создание начальной записи для Foo, так и за обновление записи.

Когда запросы POSTвойдите, чтобы добавить данные к некоторому Foo с ID = 1, назовем его foo-1, который еще не существует, если они приходят с разумным промежутком времени между ними, первый запрос создаст начальную записьдля foo-1, и все последующие вызовы будут обновляться только.Т.е. проходит достаточно времени для saveAndFlush, чтобы фактически выполнить сброс в базу данных, поэтому последующие вызовы findById находят foo-1 в базе данных и переходят к блоку else и просто обновляют его поля.

Проблема, с которой я сталкиваюсь, заключается в том, что, когда N POST для одного и того же Foo (с одним и тем же идентификатором) отправляются в службу достаточно быстро, кажется, что все соответствующие вызовы AddData происходят одновременно.Таким образом, если foo-1 еще не существует, в каждом из этих вызовов AddData, findById(1) возвращает пустое значение.Так что saveAndFlush вызывается N раз для Foo с ID = 1, что поднимает DataIntegrityViolationException.

Я копался в Интернете несколько дней, пытаясь решить эту проблему.

  • Метод уже аннотирован @Transactional.Я пытался использовать @Transactional(isolation = Isolation.SERIALIZABLE) только для метода и для всего класса, это не помогает.
  • Я пытался комментировать методы findById и saveAndFlush в FooRepository с помощью @Lock(LockModeType.PESSIMISTIC_READ) и @Lock(LockModeType.PESSIMISTIC_WRITE), соответственно, не повезло.
  • Я пытался добавить поле @Version к Foo, без изменений.

Не могу понятькак заставить AddData происходить последовательно, я думал, что именно это и должно было делать @Transactional(isolation = Isolation.SERIALIZABLE).

Я подумываю дать "создать" и "обновить" их собственные функции - создание конечной точки PUT для создания,Но тогда у конечной точки PUT возникла бы аналогичная проблема - если бы я хотел попытаться предотвратить столкновения первичного ключа в коде, мне пришлось бы выполнить аналогичную проверку с findById перед выполнением saveAndFlush.Но то, как этот сервис фактически используется, конечная точка PUT, возможно, не подходит.

Упаковка saveAndFlush в блок try / catch делает , к моему удивлению, ловит исключение.Я мог бы попробовать какую-нибудь прикольную логику, чтобы попытаться вызвать findById снова, когда saveAndFlush не удастся, но если есть способ избежать создания исключения, я бы предпочел это.

Любые предложения будут оценены!

РЕДАКТИРОВАТЬ: Еще немного контекста, который может быть полезен.Этот микросервис работает в кластере Kubernetes, где потенциально может быть много экземпляров этой службы, обслуживающей запросы одновременно.Я все еще исследую обработку параллелизма нескольких экземпляров, и выясняю, что это не то, что мне нужно делать самостоятельно - моя команда разрабатывает несколько таких микросервисов, мы можем разработать общую библиотеку для решения таких проблем для всех них..

РЕДАКТИРОВАТЬ 2: я забыл, что на данный момент я использую базу данных H2 при запуске службы, а не реальную базу данных.Может быть, это как-то связано с этим?

И еще раз повторю, здесь происходит несколько вызовов для проверки базы данных на наличие foo-1 до того, как foo-1 еще существует ; из-за этого, я не думаю, что блокировка базы данных поможет мне здесь, потому что нет никакой блокировки от 1078 * до . Я думал, что принудительное выполнение AddData по очереди решит эту проблему, и я совершенно озадачен тем, почему добавление @Transactional(isolation = Isolation.SERIALIZABLE) к AddData не делает этого для меня.

Ответы [ 2 ]

0 голосов
/ 11 июля 2019

Существуют способы выгодно использовать параллелизм вместе с Jpa, но в целом невозможно одновременно делать вызовы Jpa.

Помните, что Jpa полагается на объекты класса, такие как EntityManager, Session, Connection, etc., которые не являются поточно-ориентированными. Они были разработаны таким образом, чтобы избежать условий гонки, тупиковой ситуации и всех проблем, которые могут возникнуть из-за многопоточности. При этом Jpa требует блокировки вызовов базы данных.

Как бы то ни было, вы определенно можете реализовать свою бизнес-логику одновременно с JPA-методами для повышения производительности, о которых вы, похоже, уже знаете. Я часто использую пулы / исполнители и все еще нахожу причину, чтобы предпочесть Jpa альтернативам. Во многих случаях время, необходимое для завершения операции Jpa, очень мало по сравнению со временем, которое требуется для создания данных, проверки и т. Д. Тем не менее, требуется некоторый компромисс, поскольку в конечном итоге потребуется каждый поток в многопоточном контексте сделать блокирующий вызов в цикле событий Jpa. Так что, насколько я знаю, Isolation.SERIALIZABLE, кажется, уже является наиболее значительным шагом в достижении того, что вы хотите.

Возможно, вы захотите взглянуть на R2dbc, реактивную реализацию JDBC, которая может помочь вам выполнить то, что вы пытаетесь сделать здесь. Он находится в разработке уже давно и приближается к статусу релиза. Последнее, что я слышал, это должно быть сделано в октябре, и моя команда уже начала конвертацию в отдельной ветке.

0 голосов
/ 11 июля 2019

Я не уверен, как вы можете достичь с помощью @Transactional, но есть альтернативный подход с использованием синхронизированного блока для решения вашей проблемы. Поскольку вы сказали, что ваш ключ будет уникальным, исходя из того, что вы узнаете, существует ли уже существующий объект, я предлагаю использовать его в качестве ключа для синхронизации вашего блока вставки / обновления.

Я использую String intern для возврата того же объекта, когда ваш ключ совпадает.

@Service("fooService")
public class FooServiceImpl {

    @Autowired
    FooRepository fooRepository; // Subinterface of JpaRepository

    @Transactional
    public Long AddData(Foo foo_incoming) {
        Optional<Foo> foo_check = fooRepository.findById(incoming.getId());
        String key = **incoming.getId().intern();**
        Foo foo_exists;
        **synchronized (key)** {
            // Exists already?
            if (foo_check.isEmpty()) {
                // New Foo
                foo_exists = fooRepository.saveAndFlush(foo_incoming);
            } else {
                // Update existing foo
                foo_exists = foo_check.get();
                foo_exists.addToFieldA(foo_incoming.getFieldA());
                foo_exists.addToFieldB(foo_incoming.getFieldB());
            }
        }

        return foo_exists.getId();
    }

}

Этот пост может помочь вам в дальнейшем. Синхронизация на объектах String в Java

...