Как брать предметы из очереди кусками? - PullRequest
0 голосов
/ 17 мая 2018

У меня есть несколько потоков производителей, которые одновременно добавляют объекты в общую очередь.

Я хочу создать однопоточный потребитель, который читает из этой общей очереди для дальнейшей обработки данных (пакетная вставка базы данных).

Проблема: я хочу получать данные из очереди только частями для повышения производительности во время пакетной вставки.Таким образом, мне нужно как-то определить, сколько элементов в очереди, затем взять все эти элементы из очереди и снова очистить очередь.

 BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();

 ExecutorService pes = Executors.newFixedThreadPool(4);
 ExecutorService ces = Executors.newFixedThreadPool(1);

 pes.submit(new Producer(sharedQueue, 1));
 pes.submit(new Producer(sharedQueue, 2));
 pes.submit(new Producer(sharedQueue, 3));
 pes.submit(new Producer(sharedQueue, 4));
 ces.submit(new Consumer(sharedQueue, 1));

class Producer implements Runnable {
    run() {
            ...
            sharedQueue.put(obj);
    }
}

class Consumer implements Runnable {
    run() {
            ...
            sharedQueue.take();
    }
}

Вопрос для потребителя: как я могу опросить общий ресурс?очередь, дождаться очереди с элементами X, затем взять все элементы и одновременно очистить очередь (чтобы потребитель мог снова начать опрос и ожидание)?

Я открыт для любых предложений и не обязательно связан скод выше.

1 Ответ

0 голосов
/ 17 мая 2018

Вместо проверки размера очереди лучше создать внутреннюю List в потребителе, взять объекты из очереди и добавить в этот список.Когда в списке есть X элементов, вы выполняете обработку, а затем очищаете внутренний список.

class Consumer implements Runnable {
  private List itemsToProcess = new ArrayList();
  run() {
        while (true) { // or when producers are stopped and queue is empty
          while (itemsToPorcess.size() < X) {
            itemsToProcess.add(sharedQueue.take());
          }
          // do process
          itemsToProcess.clear();
        }
  }
}

И вместо BlockingQueue.take() вы можете использовать BlockingQueue.poll(timeout) с некоторым разумным временем ожидания и проверять результаты на null для обнаружения ситуации.когда все производители готовы и очередь пуста, чтобы можно было отключить вашего потребителя.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...