java BlockingQueue не имеет блокировки заглядывать? - PullRequest
31 голосов
/ 19 ноября 2009

У меня есть очередь блокировки объектов.

Я хочу написать поток, который блокирует, пока в очереди не появится объект. Аналогичен функциональности, предоставляемой BlockingQueue.take ().

Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто посмотреть () и не удалять объект. Я хочу удалить объект, только если смог успешно обработать его.

Итак, мне нужна блокирующая функция peek (). В настоящее время peek () просто возвращается, если очередь пуста согласно javadocs.

Я что-то упустил? Есть ли другой способ достичь этой функциональности?

EDIT:

Есть какие-нибудь мысли о том, что я просто использую потокобезопасную очередь, заглянул и уснул вместо этого?

public void run() {
    while (!exit) {
        while (queue.size() != 0) {
            Object o =  queue.peek();
            if (o != null) {
                if (consume(o) == true) {
                    queue.remove();
                } else {
                    Thread.sleep(10000); //need to backoff (60s) and try again
                }
            }
        }
        Thread.sleep(1000); //wait 1s for object on queue
    }
}

Обратите внимание, что у меня есть только один потребительский поток и один (отдельный) поток производителя. Я думаю, это не так эффективно, как использование BlockingQueue ... Любые комментарии приветствуются.

Ответы [ 7 ]

14 голосов
/ 19 ноября 2009

Вы можете использовать LinkedBlockingDeque и физически удалить элемент из очереди (используя takeLast()), но заменить его снова в конце очереди , если обработка не удалась с использованием putLast(E e). Между тем ваши «производители» будут добавлять элементы в front очереди, используя putFirst(E e).

Вы всегда можете инкапсулировать это поведение в своей собственной реализации Queue и предоставить метод blockingPeek(), который выполняет takeLast(), за которым следует putLast() за кулисами базового LinkedBlockingDeque. Следовательно, с точки зрения вызывающего клиента элемент никогда не удаляется из вашей очереди.

6 голосов
/ 19 ноября 2009

Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто посмотреть () и не удалять объект. Я хочу удалить объект, только если смог успешно обработать его.

В общем, это не потокобезопасно. Что если после того, как вы peek() и определите, что объект может быть успешно обработан, но перед тем, как вы take() его удалите и обработаете, этот поток получит другой поток?

2 голосов
/ 19 ноября 2009

Единственное, что мне известно об этом, это BlockingBuffer в Apache Commons Collections :

Если вызывается get или remove пустой буфер, вызывающий поток ожидает уведомления о добавлении или Операция addAll завершена.

get() эквивалентен peek(), и Buffer можно заставить действовать как BlockingQueue, украшая UnboundedFifoBuffer с BlockingBuffer

1 голос
/ 19 ноября 2009

Быстрый ответ заключается в том, что на самом деле нет способа заглянуть в блокировку, если вы сами создаете очередь блокировки с блокировкой ().

Я что-то упустил?

peek () может вызвать проблемы с параллелизмом -

  • Если вы не можете обработать ваше сообщение peek () d - оно останется в очереди, если у вас нет нескольких потребителей.
  • Кто собирается вывести этот объект из очереди, если вы не можете его обработать?
  • Если у вас несколько потребителей, вы получаете состояние гонки между peek () и другим потоком, также обрабатывающим элементы, что приводит к дублированию обработки или хуже.

Звучит так, как будто вам лучше удалить предмет и обработать его, используя Схема ответственности

Edit: re: ваш последний пример: если у вас есть только 1 потребитель, вы никогда не избавитесь от объекта в очереди - если он не будет обновлен за это время - в этом случае вам лучше быть очень осторожным потокобезопасность и, вероятно, не должен был бы помещать элемент в очередь в любом случае.

1 голос
/ 19 ноября 2009

Не могли бы вы также просто добавить очередь прослушивателей событий в свою очередь блокировки, а затем, когда что-то будет добавлено в очередь (блокировки), отправить событие своим слушателям? Вы можете иметь свой блок потока, пока не будет вызван его метод actionPerformed.

0 голосов
/ 14 сентября 2018

Не является ответом как таковым, но: JDK-6653412 утверждает, что это недопустимый вариант использования.

0 голосов
/ 19 ноября 2009

Похоже, что само BlockingQueue не имеет указанной вами функциональности.

Я мог бы попытаться немного перефразировать проблему: что бы вы сделали с объектами, которые вы не можете «правильно обработать»? Если вы просто оставляете их в очереди, вам придется в какой-то момент вытащить их и разобраться с ними. Я бы рекомендовал либо выяснить, как их обрабатывать (обычно, если queue.get () дает какое-либо недопустимое или неверное значение, вы, вероятно, можете просто оставить его на полу), либо выбрать другую структуру данных ФИФО.

...