Java LinkedBlockingQueue со способностью сигнализировать, когда закончите? - PullRequest
6 голосов
/ 03 марта 2012

У меня есть ситуация, когда один производитель и один потребитель работают с очередью объектов. Есть две ситуации, когда очередь может быть пустой:

  1. Потребитель обрабатывал объекты быстрее, чем производитель мог генерировать новые объекты (производитель использует ввод-вывод перед созданием объектов).
  2. Производитель завершил создание объектов.

Если очередь пуста, я хочу, чтобы потребитель подождал, пока новый объект не станет доступен или пока производитель не сообщит, что это сделано.

До сих пор мои исследования не давали мне места, потому что я все еще заканчивал циклом, который проверяет и очередь, и отдельный логический флаг (isDone). Учитывая, что невозможно ожидать нескольких блокировок (например, ожидание в очереди И флаг), что можно сделать, чтобы решить эту проблему?

Ответы [ 5 ]

2 голосов
/ 03 марта 2012

Прежде всего, предположение, что использование обертки «слишком много накладных расходов», является предположением, а IMO - очень плохим. Это предположение должно быть измерено с тестом производительности с фактическими требованиями. Если и только если тест не пройден, то с помощью профилировщика убедитесь, что обертка объекта очереди является причиной.

Тем не менее, если вы сделаете это, и перенос объекта очереди (в данном случае String) действительно является причиной неприемлемой производительности, то вы можете использовать эту технику: создать известную уникальную строку, которая будет служить «концом сообщений» сообщение.

    public static final String NO_MORE_MESSAGES = UUID.randomUUID().toString();

Затем при извлечении строк из очереди просто проверьте (это может быть проверка ссылки), является ли строка NO_MORE_MESSAGES. Если это так, то вы закончили обработку.

1 голос
/ 03 марта 2012

Также была поднята следующая опция (не уверен, что это должно быть ответом для меня, но не мог найти лучшего места, чтобы написать это):

Создать упаковщик для очереди.Эта оболочка будет иметь монитор, который будет ожидаться при чтении потребителем, и производитель будет получать уведомления при каждом добавлении нового объекта или поднятии флага isDone.

Когда потребитель читает объекты изВ очереди эти объекты будут обернуты чем-то похожим на то, что @ yann-ramin предлагал выше.Однако, чтобы уменьшить накладные расходы, потребитель будет предоставлять один, повторно используемый экземпляр QueueMessage при каждом вызове чтения (это всегда будет один и тот же экземпляр).Оболочка очереди обновит поля соответствующим образом перед возвратом экземпляра потребителю.

Это позволяет избежать использования тайм-аутов, снов и т. Д.

EDITED Это предлагаемыйреализация:

/**
 * This work queue is designed to be used by ONE producer and ONE consumer
 * (no more, no less of neither). The work queue has certain added features, such
 * as the ability to signal that the workload generation is done and nothing will be
 * added to the queue.
 *
 * @param <E>
 */
public class DefiniteWorkQueue<E> {
    private final E[] EMPTY_E_ARRAY;
    private LinkedBlockingQueue<E> underlyingQueue = new LinkedBlockingQueue<E>();
    private boolean isDone = false;

    // This monitor allows for flagging when a change was done.
    private Object changeMonitor = new Object();

    public DefiniteWorkQueue(Class<E> clazz) {
        // Reuse this instance, makes calling toArray easier
        EMPTY_E_ARRAY = (E[]) Array.newInstance(clazz, 0);
    }

    public boolean isDone() {
        return isDone;
    }

    public void setIsDone() {
        synchronized (changeMonitor) {
            isDone = true;
            changeMonitor.notifyAll();
        }
    }

    public int size() {
        return underlyingQueue.size();
    }

    public boolean isEmpty() {
        return underlyingQueue.isEmpty();
    }

    public boolean contains(E o) {
        return underlyingQueue.contains(o);
    }

    public Iterator<E> iterator() {
        return underlyingQueue.iterator();
    }

    public E[] toArray() {
        // The array we create is too small on purpose, the underlying
        // queue will extend it as needed under a lock
        return underlyingQueue.toArray(EMPTY_E_ARRAY);
    }

    public boolean add(E o) {
        boolean retval;
        synchronized (changeMonitor) {
            retval = underlyingQueue.add(o);
            if (retval)
                changeMonitor.notifyAll();
        }
        return retval;
    }

    public boolean addAll(Collection<? extends E> c) {
        boolean retval;
        synchronized (changeMonitor) {
            retval = underlyingQueue.addAll(c);
            if (retval)
                changeMonitor.notifyAll();
        }
        return retval;
    }

    public void remove(RemovalResponse<E> responseWrapper) throws InterruptedException {
        synchronized (changeMonitor) {
            // If there's nothing in the queue but it has not
            // ended yet, wait for someone to add something.
            if (isEmpty() && !isDone())
                changeMonitor.wait();

            // When we get here, we've been notified or
            // the current underlying queue's state is already something
            // we can respond about.
            if (!isEmpty()) {
                responseWrapper.type = ResponseType.ITEM;
                responseWrapper.item = underlyingQueue.remove();
            } else if (isDone()) {
                responseWrapper.type = ResponseType.IS_DONE;
                responseWrapper.item = null;
            } else {
                // This should not happen
                throw new IllegalStateException(
                    "Unexpected state where a notification of change was     made but " +
                        "nothing is in the queue and work is not     done.");
            }
        }
    }

    public static class RemovalResponse<E> {
        public enum ResponseType {
            /**
             * Used when the response contains the first item of the queue.
             */
            ITEM,

            /**
             * Used when the work load is done and nothing new will arrive.
             */
            IS_DONE
        };

        private ResponseType type;
        private E item;

        public ResponseType getType() {
            return type;
        }

        public void setType(ResponseType type) {
            this.type = type;
        }

        public E getItem() {
            return item;
        }

        public void setItem(E item) {
            this.item = item;
        }

    }
}
1 голос
/ 03 марта 2012

Один из вариантов - обернуть ваши данные в объект-держатель, который можно использовать для оповещения об окончании обработки.

Например:

public class QueueMessage {
 public MessageType type;
 public Object thingToWorkOn;
}

где MessageType - это перечисление, определяющее «рабочее» сообщение или «завершение работы».

1 голос
/ 03 марта 2012

Вы можете использовать LinkedBlockingQueues poll(long timeout, TimeUnit unit) -метод в потребителе, и если он возвращает ноль (истек тайм-аут), проверьте логический флаг. Другим способом было бы передать некоторый специальный объект «EndOfWork» в очередь как последний, так что потребитель знает, что это конец работы.

Еще одним способом было бы прерывание потока потребителя из потока производителя, но это потребовало бы, чтобы поток производителя был осведомлен о потребителе. Если они оба будут реализованы как вложенные классы, вы можете использовать родительский класс для хранения логического значения, к которому оба могут обращаться, и завершать оба потока одним логическим значением.

1 голос
/ 03 марта 2012

Simple.Определите специальный объект, который производитель может отправить в сигнал «сделано».

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...