«Закрытие» очереди блокировки - PullRequest
27 голосов
/ 21 марта 2011

Я использую java.util.concurrent.BlockingQueue в очень простом сценарии производитель-потребитель. Например. этот псевдокод отображает потребительскую часть:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Пока все хорошо. В javadoc очереди блокировки я читаю:

BlockingQueue по своей сути не поддерживать любой вид "закрыть" или операция "выключения", чтобы указать, что больше ничего не будет добавлено. Нужды и использование таких функций, как правило, зависит от реализации. Например, общая тактика для производителей вставить специальный конец потока или яд объекты, которые интерпретируются соответственно, когда принимается потребителями.

К сожалению, из-за используемых обобщений и природы ComplexObject не просто закинуть «ядовитый объект» в очередь. Так что эта "общая тактика" не очень удобна в моем сценарии.

Мой вопрос: какие еще хорошие тактики / схемы можно использовать, чтобы "закрыть" очередь?

Спасибо!

Ответы [ 10 ]

19 голосов
/ 21 марта 2011

Если у вас есть дескриптор потока потребителя, вы можете прервать его.С кодом, который вы дали, это убьет потребителя.Я не ожидал бы, что у производителя будет это;вероятно, ему придется каким-то образом перезвонить контроллеру программы, чтобы он знал, что это сделано.Затем контроллер прервет поток потребителя.

Вы всегда можете закончить работу, прежде чем повиноваться прерыванию.Например:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

Я бы на самом деле предпочел бы так или иначе отравить очередь.Если вы хотите уничтожить поток, попросите его уничтожить себя.

(Приятно видеть, что кто-то правильно обрабатывает InterruptedException.)


Похоже, что существует некоторое утверждениеобработка прерываний здесь.Во-первых, я хотел бы, чтобы все прочитали эту статью: http://www.ibm.com/developerworks/java/library/j-jtp05236.html

Теперь, с пониманием, что никто на самом деле не читал это, вот сделка.Поток получит InterruptedException, только если в данный момент он блокируется во время прерывания.В этом случае Thread.interrupted() вернет false.Если это не было блокирование, оно НЕ получит это исключение, и вместо этого Thread.interrupted() вернет true.Следовательно, ваша защита от петель должна абсолютно, несмотря ни на что, проверять Thread.interrupted() или иным образом рисковать пропустить прерывание потока.

Итак, так как вы проверяете Thread.interrupted() независимо от того, что, и вы вынужденычтобы поймать InterruptedException (и должен иметь дело с этим, даже если вы не были вынуждены), теперь у вас есть две кодовые области, которые обрабатывают одно и то же событие, прерывание потока.Один из способов справиться с этим - нормализовать их в одно условие, означающее, что либо проверка логического состояния может вызвать исключение, либо исключение может установить логическое состояние.Я выбираю позже.


Редактировать: Обратите внимание, что статический метод Thread # прерывается очищает статус прерывания текущего потока.

11 голосов
/ 21 марта 2011

Еще одна идея сделать это просто:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

Затем используйте BlockingQueue<QueueableComplexObject> в качестве очереди. Если вы хотите закончить обработку очереди, выполните queue.offer(NullComplexObject.INSTANCE). На стороне потребителя сделайте

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

Нет instanceof требуется, и вам не нужно создавать подделку ComplexObject, которая может быть дорогой / сложной в зависимости от ее реализации.

9 голосов
/ 21 марта 2011

В качестве альтернативы можно заключить обработку, которую вы выполняете, с ExecutorService и позволить самому ExecutorService контролировать, будут ли добавляться задания в очередь.

В основном вы используете ExecutorService.shutdown(), который при вызове запрещает выполнение каких-либо дополнительных задач исполнителем.

Я не уверен, как вы в данный момент отправляете задачи на QueueConsumer в вашем примере. Я сделал предположение, что у вас есть какой-то метод submit(), и использовал аналогичный метод в примере.

import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

Этот пример является просто иллюстрацией того, что предлагает пакет java.util.concurrent; возможно, в него могут быть внесены некоторые оптимизации (например, QueueConsumer, поскольку его собственный класс, вероятно, даже не нужен; вы можете просто указать ExecutorService для всех производителей, отправляющих задачи).

Пролистайте пакет java.util.concurrent (начиная с некоторых ссылок выше). Вы можете обнаружить, что это дает вам множество отличных вариантов того, что вы пытаетесь сделать, и вам даже не нужно беспокоиться о регулировании рабочей очереди.

6 голосов
/ 21 марта 2011

Еще одна возможность создания ядовитого объекта: сделать его конкретным экземпляром класса. Таким образом, вам не нужно обойти подтипы или испортить свой универсальный.

Недостаток: это не сработает, если между производителем и потребителем существует какой-то барьер сериализации.

public class ComplexObject
{
    public static final POISON_INSTANCE = new ComplexObject();

    public ComplexObject(whatever arguments) {
    }

    // Empty constructor for creating poison instance.
    private ComplexObject() {
    }
}

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().interrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                if (complexObject == ComplexObject.POISON_INSTANCE)
                    return;

                // Process complex object.

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }
    }
}
3 голосов
/ 25 мая 2011

Вы можете обернуть ваш общий объект в объект данных.В этот объект данных вы можете добавить дополнительные данные, например, статус ядовитого объекта.Объект data - это класс с 2 полями.T complexObject; и boolean poison;.

Ваш потребитель берет объекты данных из очереди.Если возвращен ядовитый объект, вы закрываете потребителя, иначе вы разворачиваете универсальный объект и вызываете 'process (complexObject)'.

Я использую java.util.concurrent.LinkedBlockingDeque<E>, чтобы вы могли добавить объект в концеочереди и возьмите их с фронта.Таким образом, ваш объект будет обрабатываться по порядку, но что более важно, безопасно закрыть очередь после того, как вы столкнетесь с объектом-ядом.

Для поддержки нескольких потребителей я добавляю объект-яд обратно в очередь при запускевнутрь.

public final class Data<T> {
    private boolean poison = false;
    private T complexObject;

    public Data() {
        this.poison = true;
    }

    public Data(T complexObject) {
        this.complexObject = complexObject;
    }

    public boolean isPoison() {
        return poison;
    }

    public T getComplexObject() {
        return complexObject;
    }
}
public class Consumer <T> implements Runnable {

    @Override
    public final void run() {
        Data<T> data;
        try {
            while (!(data = queue.takeFirst()).isPoison()) {
                process(data.getComplexObject());
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // add the poison object back so other consumers can stop too.
        queue.addLast(line);
    }
}
3 голосов
/ 21 марта 2011

Можно ли расширить ComplexObject и смоделировать нетривиальную функциональность создания? По сути, вы получаете объект оболочки, но затем можете сделать instance of, чтобы увидеть, является ли объект конца очереди.

1 голос
/ 22 июля 2012

Мне кажется разумным реализовать близкое BlockingQueue:

import java.util.concurrent.BlockingQueue;

public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
    /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
    public boolean isClosed();

    /** Closes this queue; elements cannot be added to a closed queue. **/
    public void close();
}

Было бы довольно просто реализовать это с помощью следующих действий (см. Сводную таблицу методов ).):

Я сделал это, отредактировав источник , найдите его на github.com .

0 голосов
/ 24 сентября 2013

Сегодня я решил эту проблему, используя объект-оболочку.Поскольку ComplexObject слишком сложен для подкласса, я обернул ComplexObject в объект ComplexObjectWrapper.Затем использовал ComplexObjectWrapper в качестве универсального типа.

public class ComplexObjectWrapper {
ComplexObject obj;
}

public class EndOfQueue extends ComplexObjectWrapper{}

Теперь вместо BlockingQueue<ComplexObject> я сделал BlockingQueue<ComplexObjectWrapper>

Так как у меня был контроль как для Потребителя, так и для Производителя, это решение работало для меня.

0 голосов
/ 28 мая 2013

Я использовал эту систему:

ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
    ...
    sharedQueue.drainTo(docs);
    ...
} while (queueIsNotEmpty || sharedQueue.isEmpty());

Когда производитель заканчивает, я устанавливаю для поля customerObject, queueIsNotEmpty значение false

0 голосов
/ 21 марта 2011

В этой ситуации вам, как правило, приходится отказываться от генериков и заставлять очередь содержать тип Object. затем вам нужно просто проверить свой «ядовитый» объект перед приведением к фактическому типу.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...