У меня есть несколько потоков производителей, которые одновременно добавляют объекты в общую очередь.
Я хочу создать однопоточный потребитель, который читает из этой общей очереди для дальнейшей обработки данных (пакетная вставка базы данных).
Проблема: я хочу получать данные из очереди только частями для повышения производительности во время пакетной вставки.Таким образом, мне нужно как-то определить, сколько элементов в очереди, затем взять все эти элементы из очереди и снова очистить очередь.
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();
ExecutorService pes = Executors.newFixedThreadPool(4);
ExecutorService ces = Executors.newFixedThreadPool(1);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
pes.submit(new Producer(sharedQueue, 3));
pes.submit(new Producer(sharedQueue, 4));
ces.submit(new Consumer(sharedQueue, 1));
class Producer implements Runnable {
run() {
...
sharedQueue.put(obj);
}
}
class Consumer implements Runnable {
run() {
...
sharedQueue.take();
}
}
Вопрос для потребителя: как я могу опросить общий ресурс?очередь, дождаться очереди с элементами X, затем взять все элементы и одновременно очистить очередь (чтобы потребитель мог снова начать опрос и ожидание)?
Я открыт для любых предложений и не обязательно связан скод выше.