У меня есть один производитель потоков, который создает некоторые объекты задач, которые затем добавляются в ArrayBlockingQueue
(который имеет фиксированный размер).
Я также запускаю многопоточный потребитель.Это сборка как фиксированный пул потоков (Executors.newFixedThreadPool(threadCount);
).Затем я отправляю несколько экземпляров ConsumerWorker в этот threadPool, каждый ConsumerWorker имеет ссылку на вышеупомянутый экземпляр ArrayBlockingQueue.
Каждый такой работник сделает take()
в очереди и справится с задачей.
Моя проблема в том, как лучше всего знать, когда работник не будетбольше работы предстоит сделать.Другими словами, как мне сообщить рабочим, что производитель завершил добавление в очередь, и с этого момента каждый работник должен остановиться, когда увидит, что очередь пуста.
Что у меня естьТеперь это установка, в которой мой продюсер инициализируется с помощью обратного вызова, который срабатывает, когда он заканчивает свою работу (добавления чего-либо в очередь).Я также храню список всех созданных ConsumerWorkers и отправленных в ThreadPool.Когда обратный вызов продюсера говорит мне, что продюсер закончен, я могу сказать это каждому из работников.На этом этапе они должны просто продолжать проверять, не является ли очередь пустой, а когда она становится пустой, они должны остановиться, что позволяет мне корректно завершить работу пула потоков ExecutorService.Это что-то вроде
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
Проблема здесь в том, что у меня есть очевидное состояние гонки, когда иногда производитель заканчивает, сигнализирует об этом, и ConsumerWorkers останавливаются ДО того, как потреблять все вочередь.
Мой вопрос: каков наилучший способ синхронизации, чтобы все работало нормально?Должен ли я синхронизировать всю часть, где он проверяет, работает ли производитель, плюс, если очередь пуста, плюс взять что-то из очереди в одном блоке (в объекте очереди)?Должен ли я просто синхронизировать обновление логического значения isRunning
на экземпляре ConsumerWorker?Любое другое предложение?
ОБНОВЛЕНИЕ, ЗДЕСЬ РАБОЧАЯ ОСУЩЕСТВЛЕНИЕ, КОТОРОЕ Я ЗАКОНЧИЛ, ИСПОЛЬЗУЯ:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Это было в значительной степени вдохновлено ответом JohnVint ниже, столько некоторые незначительные изменения.
=== Обновление из-за комментария @ vendhan.
Спасибо за ваше внимание.Вы правы, первый фрагмент кода в этом вопросе имеет (среди прочих вопросов) тот, где while(isRunning || !inputQueue.isEmpty())
не имеет смысла.
В моей фактической окончательной реализации этого я делаю что-то, чтоближе к вашему предложению о замене "||"(или) с "&&" (и), в том смысле, что каждый работник (потребитель) теперь только проверяет, является ли элемент, который он получил из списка, ядовитой пилюлей, и если это так, останавливается (поэтому теоретически мы можем сказать, что работникчтобы быть запущенным И очередь не должна быть пустой).