Блокировка очереди и многопоточного потребителя, как узнать, когда остановиться - PullRequest
65 голосов
/ 23 января 2012

У меня есть один производитель потоков, который создает некоторые объекты задач, которые затем добавляются в ArrayBlockingQueue (который имеет фиксированный размер).

Я также запускаю многопоточный потребитель.Это сборка как фиксированный пул потоков (Executors.newFixedThreadPool(threadCount);).Затем я отправляю несколько экземпляров ConsumerWorker в этот threadPool, каждый ConsumerWorker имеет ссылку на вышеупомянутый экземпляр ArrayBlockingQueue.

Каждый такой работник сделает take() в очереди и справится с задачей.

Моя проблема в том, как лучше всего знать, когда работник не будетбольше работы предстоит сделать.Другими словами, как мне сообщить рабочим, что производитель завершил добавление в очередь, и с этого момента каждый работник должен остановиться, когда увидит, что очередь пуста.

Что у меня естьТеперь это установка, в которой мой продюсер инициализируется с помощью обратного вызова, который срабатывает, когда он заканчивает свою работу (добавления чего-либо в очередь).Я также храню список всех созданных ConsumerWorkers и отправленных в ThreadPool.Когда обратный вызов продюсера говорит мне, что продюсер закончен, я могу сказать это каждому из работников.На этом этапе они должны просто продолжать проверять, не является ли очередь пустой, а когда она становится пустой, они должны остановиться, что позволяет мне корректно завершить работу пула потоков ExecutorService.Это что-то вроде

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

Проблема здесь в том, что у меня есть очевидное состояние гонки, когда иногда производитель заканчивает, сигнализирует об этом, и ConsumerWorkers останавливаются ДО того, как потреблять все вочередь.

Мой вопрос: каков наилучший способ синхронизации, чтобы все работало нормально?Должен ли я синхронизировать всю часть, где он проверяет, работает ли производитель, плюс, если очередь пуста, плюс взять что-то из очереди в одном блоке (в объекте очереди)?Должен ли я просто синхронизировать обновление логического значения isRunning на экземпляре ConsumerWorker?Любое другое предложение?

ОБНОВЛЕНИЕ, ЗДЕСЬ РАБОЧАЯ ОСУЩЕСТВЛЕНИЕ, КОТОРОЕ Я ЗАКОНЧИЛ, ИСПОЛЬЗУЯ:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

Это было в значительной степени вдохновлено ответом JohnVint ниже, столько некоторые незначительные изменения.

=== Обновление из-за комментария @ vendhan.

Спасибо за ваше внимание.Вы правы, первый фрагмент кода в этом вопросе имеет (среди прочих вопросов) тот, где while(isRunning || !inputQueue.isEmpty()) не имеет смысла.

В моей фактической окончательной реализации этого я делаю что-то, чтоближе к вашему предложению о замене "||"(или) с "&&" (и), в том смысле, что каждый работник (потребитель) теперь только проверяет, является ли элемент, который он получил из списка, ядовитой пилюлей, и если это так, останавливается (поэтому теоретически мы можем сказать, что работникчтобы быть запущенным И очередь не должна быть пустой).

Ответы [ 6 ]

85 голосов
/ 23 января 2012

Вы должны продолжить до take() из очереди.Вы можете использовать таблетку с ядом, чтобы сказать работнику остановиться.Например:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}
14 голосов
/ 23 января 2012

Я бы отправил рабочим специальный рабочий пакет, чтобы сообщить, что они должны выключиться:

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

Чтобы остановить работников, просто добавьте ConsumerWorker.DONE в очередь.

1 голос
/ 15 ноября 2017

Мы не можем сделать это, используя CountDownLatch, где размер - это количество записей в производителе.И каждый потребитель будет countDown после обработки записи.И он пересекает метод awaits(), когда все задачи завершены.Тогда остановите всех ваших потребителей.Поскольку все записи обрабатываются.

1 голос
/ 23 января 2012

В вашем кодовом блоке, где вы пытаетесь извлечь элемент из очереди, используйте poll(time,unit) вместо take().

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

Указывая соответствующие значения времени ожидания, вы гарантируете, что потоки не будутпродолжайте блокировать, если есть неудачная последовательность

  1. isRunning true
  2. Очередь становится пустой, поэтому потоки переходят в заблокированное ожидание (при использовании take()
  3. isRunning имеет значение false
0 голосов
/ 13 февраля 2017

Мне пришлось использовать многопоточный производитель и многопоточный потребитель. Я получил схему Scheduler -- N Producers -- M Consumers, каждый из которых связывается через очередь (всего две очереди). Планировщик заполняет первую очередь запросами на получение данных, а затем заполняет ее N «ядовитыми таблетками». Существует счетчик активных производителей (atomic int), и последний производитель, который получил последнюю таблетку с ядом, отправляет M таблеток с ядом в очередь потребителя.

0 голосов
/ 23 января 2012

Существует несколько стратегий, которые вы можете использовать, но одна из простых - иметь подкласс задач, который сигнализирует об окончании работы.Производитель не отправляет этот сигнал напрямую.Вместо этого он ставит в очередь экземпляр этого подкласса задачи.Когда один из ваших потребителей выполняет эту задачу и выполняет ее, это вызывает отправку сигнала.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...