Нужна помощь в разработке "бесконечных" потоков - PullRequest
4 голосов
/ 23 февраля 2009

У меня есть какая-то таблица базы данных, и мне нужно обрабатывать записи из нее 5 одновременно, пока приложение работает. Итак, это выглядит так:

  1. Получить запись, которая еще не была обработана или не обрабатывается другими потоками.
  2. Обработайте его (это длительный процесс, который зависит от подключения к Интернету, поэтому он может тайм-аут / выбрасывать ошибки).
  3. Перейти к следующей записи. Когда достигнут конец таблицы, начинайте с начала.

У меня нет большого опыта работы с потоками, поэтому я вижу две возможные стратегии:

Подход А.

1. Создать новый ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

2.Добавить к нему 5 заданий:

for (int i = 0; i < 5; i++) {
    taskExecutor.execute(new MyTask());
}

3.Каждое задание будет бесконечным циклом, который: читает запись из таблицы, обрабатывает ее, а затем получает другую запись.

Проблема этого подхода заключается в том, как сообщить другим потокам о том, какие записи обрабатываются в данный момент. Для этого я могу либо использовать поле «status» в таблице, либо просто использовать некоторый CopyOnWriteArraySet, в котором хранятся идентификаторы, обрабатываемые в данный момент.

Подход B.

1.Создать тот же ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

2. Иметь бесконечный цикл, который выбирает записи, которые необходимо обработать, и передает их исполнителю:

while (true) {
    //get next record here
    taskExecutor.execute(new MyTask(record));
    //monitor the queue and wait until some thread is done processing,
    //so I can add another record
}

3.Каждое задание обрабатывает одну запись.

Проблема этого подхода заключается в том, что мне нужно добавлять задачи в очередь исполнителя медленнее, чем они обрабатываются, чтобы не допустить их накапливания со временем. Это означает, что мне нужно следить не только за тем, какие задачи выполняются в данный момент, но и когда они завершаются, поэтому я могу добавлять новые записи в очередь.

Лично я думаю, что первый подход лучше (легче), но я чувствую, что второй подход более правильный. Как вы думаете? Или, может быть, я должен сделать что-то совершенно другое?

Также я могу использовать библиотеки Spring или Quartz для этого при необходимости.

Спасибо.

Ответы [ 5 ]

5 голосов
/ 23 февраля 2009

Я думаю, что CompletionService ExecutorCompletionService ) могут вам помочь.

Вы отправляете все свои задачи через службу завершения, и это позволяет вам ждать, пока один из потоков (любой поток) не завершит свою задачу. Таким образом, вы можете отправить следующую задачу, как только появится свободная тема. Это будет означать, что вы используете подход B.

Псевдокод:

Create ThreadPoolExecutor and ExecutorCompletionService wrapping it

while (true) {
  int freeThreads = executor.getMaximumPoolSize() - executor.getActiveCount()
  fetch 'freeThreads' tasks and submit to completion service (which
                                      in turn sends it to executor)

  wait until completion service reports finished task (with timeout)
}

Тайм-аут в ожидании помогает вам избежать ситуации, когда в очереди не было задачи, поэтому все потоки простаивают, и вы ожидаете, пока один из них не завершится - что никогда не произойдет.

Вы можете проверить количество свободных тем с помощью ThreadPoolExecutor методами: getActiveCount (активные темы) и getMaximumPoolSize (максимум доступных настроенных тем). Вам нужно будет создать ThreadPoolExecutor напрямую или привести объект, возвращенный из Executors.newFixedThreadPool (), хотя я бы предпочел непосредственное создание ... подробности см. В источнике метода Executors.newFixedThreadPool ().

4 голосов
/ 23 февраля 2009

Альтернативой является использование ArrayBlockingQueue размера 5. Один поток производителей будет проходить по таблице, первоначально заполняя ее и вставляя записи по мере их обработки потребителями. Каждый из пяти пользовательских потоков возьмет () запись, обработает ее и вернется к другой записи. Таким образом, поток производителя гарантирует, что запись не передается двум потокам одновременно, а потоки потребителя работают с независимыми записями. Java Concurrency на практике , вероятно, предоставит вам гораздо больше возможностей и отлично подходит для решения проблем такого типа.

1 голос
/ 23 февраля 2009

У меня была бы статическая коллекция в MyTask

public class MyTask implements Runnable {
  private static ArrayList<RecordID> processed = new ArrayList<RecordID>();
  private static ArrayList<RecordID> processing = new ArrayList<RecordID>();

  private RecordID working = null;

  public void run() {
    for(;;) {
      synchronized( MyTask.class ) {
        Record r = getUnprocessedRecord(); // use processed and processing to do query
        if ( r == null ) {  // no more in table to process
          if ( processing.length == 0 ) { // nothing is processing
            processed.clear();  // this should allow us to get some results on the next loop
          }
          Thread.sleep( SLEEP_INTERVAL );
          continue;
        } else {
          working = r.getRecordID();
          processing.add( working );
        }
      }
      try {
        //do work
        synchronized( MyTask.class ) {
          processed.add(working);
        }
      } catch( Whatever w ){
      } finally {
        synchronized( MyTask.class ) {
          processing.remove(working);
        }
      } 
    }
  }

}

1 голос
/ 23 февраля 2009

Я бы пошел с таким подходом:

Используйте один поток, чтобы распределить работу. Эта нить будет порождать 5 других тем и идти спать. Когда рабочий поток завершает свою работу, он пробуждает рабочий поток распределителя, который затем порождает новый рабочий поток и переходит в спящий режим ...

0 голосов
/ 23 февраля 2009

Мое личное мнение, иди на КВАРЦ с весны. Это идеальный выбор. Использую его на производстве уже более 2 лет. Зачем пытаться заново изобрести колесо, когда некоторые люди уже делают это лучше всего. Не говоря уже о различных режимах его работы. Я бы посоветовал хотя бы попробовать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...