Как немедленно освободить темы, ожидающие в BlockingQueue - PullRequest
9 голосов
/ 22 июля 2010

Рассмотрим BlockingQueue и несколько потоков, ожидающих на poll(long, TimeUnit) (возможно, также на take()).

Теперь очередь пуста, и желательно уведомить ожидающие потоки, что они могут прекратить ожидание. Ожидаемое поведение - возвращать либо null, либо объявленный InterruptedException.

Object.notify() не будет работать для LinkedBlockingQueue, поскольку потоки ожидают внутренней блокировки.

Есть какой-нибудь прямой путь?

Ответы [ 3 ]

13 голосов
/ 22 июля 2010

Javadoc для BlockingQueue предлагает хороший способ:

BlockingQueue по своей сути не поддерживать любой вид "закрыть" или операция "выключения", чтобы указать, что больше ничего не будет добавлено. Нужды и использование таких функций, как правило, зависит от реализации. Например, общая тактика для производителей вставить специальный конец потока или яд объекты, которые интерпретируются соответственно, когда принимается потребителями.

5 голосов
/ 22 июля 2010

Обычный способ - прерывать потоки, но это, конечно, требует, чтобы они правильно обрабатывали прерывания.

Это означает, что нужно правильно ловить и обрабатывать InterruptedException вокруг методов блокировки, а в противном случае регулярно проверять (и действовать) флаг interrupted.

В спецификации API или языка нет ничего, что связывало бы прерывание с какой-либо конкретной семантикой отмены, но на практике использование прерывания для чего угодно, кроме отмены, хрупко и трудно поддерживать в больших приложениях. [...]

Прерывание, как правило, является наиболее разумным способом осуществления отмены.

Говорит Параллелизм Java на практике в разделе 7.1.1. Пример правильной обработки прерывания из того же самого (это поток производителя, а не потребитель, но это различие незначительно в текущем контексте):

class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /*  Allow thread to exit  */
        }
    }
    public void cancel() { interrupt(); }
}

Альтернативным решением было бы установить разумно низкий параметр тайм-аута poll, чтобы поток регулярно просыпался и мог достаточно быстро замечать прерывания. Тем не менее, я считаю, что всегда полезно явно обрабатывать InterruptedException в соответствии с вашей политикой отмены потоков.

1 голос
/ 22 июля 2010

Я бы сказал, что с вашим дизайном что-то не так.Потоки, использующие BlockingQueue, не должны прерываться таким образом.Если они должны делать что-то еще через регулярные промежутки времени (например, проверять состояние переменной), в то же время потребляя из очереди, то вам следует использовать метод poll () с соответствующим временем ожидания, чтобы эти два действия могли чередоваться.1001 *

...