Java BlockingQueue блокировка на take (), с небольшим поворотом - PullRequest
0 голосов
/ 04 января 2012

У меня есть ситуация, когда у меня есть 2 очереди блокировки. Сначала я вставляю некоторые задачи, которые я выполняю. Когда каждая задача завершается, она добавляет задачу во вторую очередь, где они выполняются.

Так что моя первая очередь проста: я просто проверяю, чтобы убедиться, что она не пуста, и выполняю, иначе я прерываю ():

public void run() {
    try {
        if (taskQueue1.isEmpty()) {
            SomeTask task = taskQueue1.poll();
            doTask(task);
            taskQueue2.add(task);
        }
        else {
            Thread.currentThread().interrupt();
        }
    }

    catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

Второй я делаю следующее, что, как вы можете сказать, не работает:

public void run() {
    try {
        SomeTask2 task2 = taskQueue2.take();
        doTask(task2);
    }

    catch (InterruptedException ex) {

    }
    Thread.currentThread().interrupt();

}

Как бы вы решили это так, чтобы второе BlockingQueue не блокировалось при take (), но завершалось только тогда, когда узнало, что больше нет элементов, которые нужно добавить. Было бы хорошо, если бы 2-й поток мог видеть 1-ую очередь блокировки, возможно, и проверить, была ли она пуста, а 2-я очередь также пуста, тогда он прервется.

Я мог бы также использовать объект Poison, но предпочел бы что-нибудь еще.

NB. Это не точный код, я просто написал здесь:

Ответы [ 2 ]

2 голосов
/ 04 января 2012

Я не могу понять, что вы на самом деле пытаетесь сделать здесь, но я могу сказать, что interrupt() в вашем первом run() методе либо бессмыслен, либо ошибочен.

  • Если вы используете метод run() в своем собственном объекте Thread, то этот поток все равно собирается выйти, поэтому нет смысла его прерывать.

  • Если вызапустив метод run() в исполнителе с пулом потоков, вы, скорее всего, вообще не захотите уничтожать поток или выключать исполнителя ... в этот момент.И если вы хотите завершить работу исполнителя, вам следует вызвать один из его методов завершения работы.


Например, вот версия, которая делает то, без чего вы, по-видимому, делаетевсе прерывания и без создания / уничтожения потоков.

public class TaskExecutor {

    private ExecutorService executor = new ThreadPoolExecutorService(...);

    public void submitTask1(final SomeTask task) {
        executor.submit(new Runnable(){
            public void run() {
                doTask(task);
                submitTask2(task);
            }
        });
    }

    public void submitTask2(final SomeTask task) {
        executor.submit(new Runnable(){
            public void run() {
                doTask2(task);
            }
        });
    }

    public void shutdown() {
        executor.shutdown();
    }
 }

Если вы хотите отдельную очередь для задач, просто создайте и используйте двух разных исполнителей.

1 голос
/ 04 января 2012

Вы звучите так, как будто поток, обрабатывающий первую очередь, знает, что больше задач не будет, как только его очередь будет очищена.Это звучит подозрительно, но я в любом случае верю вам на слово и предложу решение.

Определите AtomicInteger, видимый для обеих тем.Инициализируйте его как положительный .

Определите операцию потока first следующим образом:

  • Цикл на Queue#poll().
  • Если Queue#poll() возвращает ноль, вызовите AtomicInteger#decrementAndGet() для общего целого числа.
    • Если AtomicInteger#decrementAndGet() вернул ноль, прервите второй поток с помощью Thread#interrupt().(Это обрабатывает случай, когда никакие элементы никогда не поступали.)
    • В любом случае выйдите из цикла.
  • В противном случае обработайте извлеченный элемент, вызовите AtomicInteger#incrementAndGet() onобщее целое число, добавьте извлеченный элемент в очередь второго потока и продолжите цикл.

Определите операцию потока second следующим образом:

  • Блокировка цикла на BlockingQueue#take().
  • Если BlockingQueue#take() выдает InterruptedException, перехватить исключение, вызвать Thread.currentThread().interrupt() и выйти из цикла.
  • В противном случае обработать извлеченный элемент.
  • Вызовите AtomicInteger#decrementAndGet() на общем целом числе.
    • Если AtomicInteger#decrementAndGet() вернул ноль, выйдите из цикла.
    • В противном случае продолжите цикл.

Убедитесь, что вы понимаете идеюпрежде чем пытаться написать реальный код.Контракт заключается в том, что второй поток продолжает ожидать больше элементов из своей очереди, пока число ожидаемых задач не достигнет нуля.В этот момент производящий поток (первый) больше не будет помещать какие-либо новые элементы в очередь второго потока, поэтому второй поток знает, что безопасно прекратить обслуживание своей очереди.

Случай с винтом возникает, когда no задачи когда-либо поступают в очередь первого потока.Поскольку второй поток только уменьшает и проверяет счетчик после , он обрабатывает элемент, если он никогда не получает возможности обработать какие-либо элементы, он никогда не будет рассматривать остановку.Мы используем прерывание потока для обработки этого случая за счет другого условного перехода на этапах завершения цикла первого потока.К счастью, эта ветвь будет выполняться только один раз.

Есть много проектов, которые могут работать здесь.Я просто описал один, который ввел только одну дополнительную сущность - общее атомное целое число, - но даже тогда это сложно.Я думаю, что использование отравляющей таблетки было бы намного чище, хотя я допускаю, что ни Queue#add(), ни BlockingQueue#put() не принимают ноль в качестве допустимого элемента (из-за контракта на возвращаемую стоимость Queue#poll()).В противном случае было бы легко использовать ноль в качестве отравленной таблетки .

...