Параллельный процесс вставки данных в базу данных - PullRequest
1 голос
/ 14 июля 2009

Рассмотрим следующую схему в базе данных postgres.

CREATE TABLE employee
(
  id_employee serial NOT NULL PrimarKey,
  tx_email_address text NOT NULL Unique,
  tx_passwd character varying(256)
)

У меня есть класс Java, который выполняет следующие действия:

conn.setAutoComit(false);

ResultSet rs = stmt.("select * from employee where tx_email_address = 'test1'");
if (!rs.next()) {
    Insert Into employee Values ('test1', 'test1');
}
ResultSet rs = stmt.("select * from employee where tx_email_address = 'test2'");
if (!rs.next()) {
    Insert Into employee Values ('test2', 'test2');
}
ResultSet rs = stmt.("select * from employee where tx_email_address = 'test3'");
if (!rs.next()) {
    Insert Into employee Values ('test3', 'test3');
}
ResultSet rs = stmt.("select * from employee where tx_email_address = 'test4'");
if (!rs.next()) {
    Insert Into employee Values ('test4', 'test4');
}

conn.commit();
conn.setAutoComit(true);

Проблема здесь в том, что два или более параллельных экземпляра вышеупомянутой транзакции пытаются записать данные. Только одна транзакция в конечном итоге будет успешной, а остальные вызовут SQLException «нарушение ограничения уникального ключа». Как нам обойти это.

PS: я выбрал только одну таблицу и простые запросы на вставку, чтобы продемонстрировать проблему. Мое приложение - это Java-приложение, единственная цель которого - записать данные в целевую базу данных. и при этом может быть одновременный процесс, и существует очень высокая вероятность того, что какой-то процесс может пытаться записывать одни и те же данные (как показано в примере выше).

Ответы [ 7 ]

1 голос
/ 14 июля 2009

Сначала я бы попытался спроектировать поток данных таким образом, чтобы только одна транзакция когда-либо получала один экземпляр данных. В этом случае «нарушение ограничения уникального ключа» никогда не должно происходить и, следовательно, указывает на реальную проблему.

В противном случае я бы ловил и игнорировал «нарушение ограничения уникального ключа» после каждой вставки. Конечно, регистрация того, что это произошло, может быть хорошей идеей.

Если бы оба подхода были неосуществимы по какой-либо причине, то я бы, скорее всего, создал бы таблицу транзитных сообщений той же структуры, что и «сотрудник», но без ограничения первичного ключа и с полем «статус транзита». Никакое «нарушение ограничения уникального ключа» никогда не произойдет при вставке в эту транзитную таблицу. Требуется задание, которое считывает эту транзитную таблицу и передает данные в таблицу "employee". Это задание будет использовать «транзитный статус» для отслеживания обработанных строк. Я бы позволил работе делать разные вещи при каждом запуске:

  • выполнить оператор обновления для таблицы транзита, чтобы установить для «статуса транзита» значение «в процессе» для ряда строк. Насколько велико это число или если все текущие новые строки будут помечены, нужно подумать.
  • выполнить оператор обновления, который устанавливает «транзитный статус» на «дубликат» для всех строк, данные которых уже находятся в таблице «employee», а чей «транзитный статус» отсутствует («duplicate», «processing»)
  • повторять до тех пор, пока в транзитной таблице есть строки с «транзитным статусом» = «в процессе выполнения»:
    • выберите строку из таблицы транзита с «транзитным статусом» = «незавершенное производство».
    • Вставить данные этих строк в таблицу "employee".
    • Установите для этих строк "транзитный статус" в "обработано".
    • обновить все строки в транзитной таблице теми же данными, что и текущая обработанная строка, и для "статуса транзита" = "незавершенного производства" установлено значение "статус транзита" = "дубликат".

Скорее всего, я бы хотел, чтобы другая работа регулярно удаляла строки с "транзитным статусом" в ("дубликат", "обработан")

Если postgres не знает заданий базы данных, подойдет работа на стороне ОС.

1 голос
/ 22 января 2017

Решение состоит в том, чтобы использовать монопольную блокировку на уровне таблицы , блокировку записи для одновременного чтения, используя команду LOCK . Псевдо-SQL-код:

select * from employee where tx_email_address = 'test1';
if not exists
   lock table employee in exclusive mode;
   select * from employee where tx_email_address = 'test1';
   if still not exists //may be inserted before lock
      insert into employee values ('test1', 'test1');
      commit; //releases exclusive lock

Обратите внимание, что использование этого метода будет блокировать все другие записи до снятия блокировки, снижая пропускную способность.

Если все вставки зависят от родительской строки, то лучшим подходом является блокировка только родительской строки, сериализация дочерних вставок, а не блокировка всей таблицы.

1 голос
/ 14 июля 2009

Казалось бы, самый простой способ - использовать уровень изоляции транзакции 'serializable', который предотвращает фантомные чтения (другие люди вставляют данные, которые удовлетворяют предыдущему SELECT во время вашей транзакции).

if (!conn.getMetaData().supportsTransactionIsolationLevel(Connection.TRANSACTION_SERIALIZABLE)) {
    // OK, you're hosed. Hope for your sake your drivers supports this isolation level 
}
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);

Существуют также методы, такие как оператор Oracle «MERGE» - один оператор, который «вставляет или обновляет», в зависимости от наличия данных. Я не знаю, есть ли у Postgres аналог, но есть способы «подделать его», например, см. Как написать INSERT, ЕСЛИ НЕ СУЩЕСТВУЕТ запросы в стандартном SQL .

0 голосов
/ 01 октября 2009

Одним из способов решения этой конкретной проблемы является обеспечение того, чтобы каждый из отдельных потоков / экземпляров обрабатывал строки взаимоисключающим образом. Другими словами, если экземпляр 1 обрабатывает строки, где tx_email_address='test1', никакой другой экземпляр не должен обрабатывать эти строки снова.

Этого можно достичь, генерируя уникальный идентификатор сервера при запуске экземпляра и помечая строки, которые будут обработаны этим идентификатором сервера. Способ сделать это -

<LOOP>

  1. добавление 2 столбцов status и server_id в таблицу сотрудников.
  2. update employee set status='In Progress', server_id='<unique_id_for_instance>' where status='Uninitialized' and rownum<2
  3. совершить
  4. select * from employee where server_id='<unique_id_for_instance>' and status='In Progress'
  5. обработка строк, выбранных на шаге 4.

<END LOOP>

Выполнение приведенной выше последовательности шагов гарантирует, что все экземпляры виртуальной машины получат разные строки для обработки и не будет тупиков. Необходимо обновить до выбора, чтобы сделать операцию атомарной. В противном случае это может привести к проблемам с параллелизмом.

Надеюсь, это поможет

0 голосов
/ 14 июля 2009

Вы можете добавить управление параллелизмом на уровне приложения, сделав код критическим разделом:

synchronized(lock) {
  // Code to perform selects / inserts within database transaction.
}

Таким образом, один поток не может запрашивать таблицу, а другой - запрашивать и вставлять в таблицу. Когда первый поток завершается, второй поток входит в синхронизированный блок. Однако в этот момент каждая попытка выбора будет возвращать данные, и, следовательно, поток не будет пытаться вставить данные.

EDIT

В тех случаях, когда в одну и ту же таблицу вставляются несколько процессов, вы можете рассмотреть возможность снятия блокировки таблицы при выполнении транзакции, чтобы предотвратить начало других транзакций. Это фактически делает то же самое, что и код выше (то есть сериализацию двух транзакций), но на уровне базы данных. Очевидно, что это может повлиять на производительность.

0 голосов
/ 14 июля 2009

Вы можете предоставить открытый метод, который ставит в очередь операции записи и обрабатывает параллелизм очереди, а затем создает другой метод для запуска в другом потоке (или другом процессе полностью), который фактически выполняет запись последовательно.

0 голосов
/ 14 июля 2009

Часто используемая система должна иметь первичный ключ, который представляет собой UUID (уникальный универсальный идентификатор) и UUIDGenerator, см. http://jug.safehaus.org/ или подобные вещи, у Google есть много ответов

Это предотвратит ограничение уникального ключа

Но это несоответствие - только часть вашей проблемы, ваш tx_email_address все равно должен быть уникальным, и ничто не решает этого.

Нет способа предотвратить нарушение ограничения, если у вас есть параллелизм, вы столкнетесь с ним, и само по себе это действительно не проблема.

...