Буферизация БД в многопоточной программе - PullRequest
4 голосов
/ 11 мая 2010

У меня есть система, которая разбивает большие такты на маленькие задачи, используя около 30 потоков одновременно. По завершении каждого отдельного потока он сохраняет свои рассчитанные результаты в базе данных. Я хочу, чтобы каждый поток передавал свои результаты новому классу сохранения, который будет выполнять тип двойной буферизации и сохранения данных при работе в своем собственном потоке.

Например, после того, как 100 потоков переместили свои данные в буфер класса сохраняемости, класс сохраняемости меняет местами буферы и сохраняет все 100 записей в базе данных. Это позволило бы использовать подготовленные операторы и, таким образом, сократить объем операций ввода-вывода между программой и базой данных.

Есть ли образец или хороший пример этого типа многопоточной двойной буферизации?

Ответы [ 2 ]

4 голосов
/ 11 мая 2010

Я видел этот шаблон, называемый асинхронной записью в базу данных или шаблоном обратной записи. Это типичный шаблон, поддерживаемый продуктами распределенного кэша (Teracotta, Coherence, GigaSpaces, ...), потому что вы не хотите, чтобы обновления вашего кэша также включали запись изменений в базовую базу данных.

Сложность этого шаблона зависит от вашей терпимости к потерянным обновлениям базы данных. Из-за задержки между завершением работы и записью результата в базу данных вы можете потерять обновления из-за ошибок, сбоев питания, ... (вы получаете картину).

Я бы предложил какую-то очередь для завершенных результатов, которые будут записаны в БД, а затем обработал бы их партиями по 100 (на вашем примере) ИЛИ через некоторое время. Причина также использования временной задержки заключается в том, чтобы справиться с наборами результатов, которые не делятся на 100.

Если у вас нет требований к устойчивости / долговечности, вы можете сделать все это в одном процессе. Однако, если вы не можете допустить потери, вы можете заменить очередь in-vm на постоянную очередь JMS (медленнее, но безопаснее).

1 голос
/ 11 мая 2010

Чтобы снизить накладные расходы на синхронизацию, используйте локальный поток (для каждого вычислительного потока) для создания пакетов результатов. По достижении определенного количества результатов поставьте пакет в очередь блокировки. Используйте ArrayBlockingQueue для поддержки вашего класса персистентности, поскольку вы, вероятно, не хотите, чтобы использование вашей памяти стало неограниченным. Вы можете иметь несколько потоков писателя базы данных, берущих группы результатов и сохраняющих их в базу данных.

class WriteBehindPersister {
 ThreadLocal<List<Result>> internalBuffer;
 static ArrayBlockingQueue<List<Result>> persistQueue;
 static {
   persistQueue = new ArrayBlockingQueue(10);
   new WriteThread().start();
 }    

 public WriteBehindPersister() {
  internalBuffer = new ThreadLocal<List<Result>>();
 }

 public void persist(Result r) {
  List<Result> localResult = internalBuffer.get();
  localResult.add(r);
  if (localResult.size() > max) {
   persistQueue.put(new ArrayList(localResult));
   localResult.clear();
  }
 }

 class WriteThread extends Thread {
  public void run() {
   while (true) {
    List<Result> batch = persistQueue.take();
    beginTransaction();
    for (Result r : batch) {
     batchInsert(r);
    }
    endTransaction();
   }
  }
 }

}

Кроме того, вы можете использовать службу executor (вместо одного потока записи) для одновременного сохранения нескольких пакетов в БД в обмен на использование более одного соединения с БД. Обязательно используйте API пакетной обработки JDBC, если ваш драйвер его поддерживает.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...