Общение конец очереди - PullRequest
       3

Общение конец очереди

9 голосов
/ 31 августа 2010

Я учусь использовать модуль очереди, и меня немного смущает вопрос о том, как можно заставить поток потребителя очереди узнать, что очередь заполнена.В идеале я хотел бы использовать get() из потока потребителя, чтобы он выдавал исключение, если очередь помечена как «выполнено».Есть ли лучший способ сообщить об этом, чем путем добавления значения часового, чтобы отметить последний элемент в очереди?

Ответы [ 5 ]

9 голосов
/ 09 сентября 2010

оригинал (большая часть этого изменилась; см. Обновления ниже)

На основании некоторых предложений (спасибо!) Из Гленна Мейнарда и других, ярешил свернуть потомка Queue.Queue, который реализует метод close.Он доступен в виде примитива (без упаковки) модуль .Я уберу это немного и упакую должным образом, когда у меня будет немного больше времени.На данный момент модуль содержит только класс CloseableQueue и класс исключений Closed.Я планирую расширить его, включив в него также подклассы Queue.LifoQueue и Queue.PriorityQueue.

В настоящее время он находится в довольно предварительном состоянии, то есть, несмотря на то, что он проходит свой набор тестов, я нена самом деле использовал его для чего-либо еще.Ваш пробег может варьироваться.Я буду держать этот ответ обновленным с захватывающими новостями.

Класс CloseableQueue немного отличается от предложения Гленна в том, что закрытие очереди предотвратит будущие put с, но не предотвратит будущие get с, покаочередь опустошена.Это имело для меня наибольшее значение;казалось, что функциональность для очистки очереди может быть добавлена ​​как отдельный миксин *, который будет ортогонален функциональности закрываемости.Таким образом, в основном с CloseableQueue, закрывая очередь, вы указываете, что последний элемент был put.Также есть возможность сделать это атомарно, передав last=True на последний put вызов.Последующие вызовы put и последующие вызовы get после освобождения очереди, а также невыполненные заблокированные вызовы, соответствующие этим описаниям, вызовут исключение Closed.

Это в основном полезно в ситуацияхгде один производитель генерирует данные для одного или нескольких потребителей, но это также может быть полезно для мульти-мультипроекта, где потребители ждут определенного товара или набора товаров.В частности, это не дает возможность определить, что все несколько производителей завершили производство.Получение этой работы повлекло бы за собой предоставление некоторого способа регистрации производителей (.open()?), А также способа указать, что регистрация производителя сама закрыта.

Предложения и / или проверки кода весьма приветствуются.Я не написал много кода для параллелизма, но, надеюсь, набор тестов достаточно тщательный, чтобы тот факт, что код проходит его, является показателем качества кода, а не его отсутствия.Мне удалось повторно использовать код из набора тестов модуля Queue: сам файл включен в этот модуль и используется в качестве основы для различных подклассов и процедур, включая регрессионное тестирование.Это, вероятно (надеюсь) помогло избежать полной неумелости в отделе тестирования.Сам код просто переопределяет Queue.get и Queue.put с довольно минимальными изменениями и добавляет методы close и closed.

Я вроде как намеренно избегал использования любой новой фантазии, такой как контекстменеджеры как в самом коде, так и в наборе тестов, чтобы сохранить код как обратно совместимый, так же как и сам модуль Queue, который действительно значительно обратный.Возможно, в какой-то момент я добавлю методы __enter__ и __exit__;в противном случае функция закрытия для contextlib должна быть применима к экземпляру CloseableQueue.

*: Здесь я свободно использую термин «миксин».Так как классы модуля Queue выполнены в старом стиле, миксины необходимо будет смешивать с использованием функций фабрики классов;применяются некоторые ограничения;предложение void там, где это запрещено Гвидо.

update

Модуль CloseableQueue теперь предоставляет классы CloseableLifoQueue и CloseablePriorityQueue.Я также добавил несколько вспомогательных функций для поддержки итерации.Еще нужно переделать его как надлежащий пакет.Есть функция фабрики классов, которая позволяет удобно создавать подклассы для других Queue.Queue классов.

update 2

CloseableQueue теперь доступно через PyPI , например, с

$ easy_install CloseableQueue

Комментарии и критика приветствуются, особенно от анонимного downvoter этого ответа.

8 голосов
/ 31 августа 2010

Очереди по своей сути не имеют идеи быть завершенным или готовым. Их можно использовать бесконечно. Чтобы закрыть его, когда вы закончите, вам действительно понадобится поставить None или какое-то другое магическое значение в конце и написать логику для проверки, как вы описали. Идеальным способом, вероятно, было бы создание подкласса объекта Queue.

См. http://en.wikipedia.org/wiki/Queue_(data_structure), чтобы узнать больше об очереди в целом.

6 голосов
/ 31 августа 2010

Часовой - это естественный способ закрыть очередь, но есть несколько вещей, на которые стоит обратить внимание.

Во-первых, помните, что у вас может быть несколько потребителей, поэтому вам нужно отправитьодин раз для каждого работающего потребителя и гарантируйте, что каждый потребитель будет использовать только одного стража, чтобы каждый получатель получал свой часовой выключатель.

Во-вторых, помните, что очередь определяет интерфейс и что, когда это возможно, код долженвести себя независимо от базовой очереди.У вас может быть PriorityQueue или другой класс, который предоставляет тот же интерфейс и возвращает значения в каком-то другом порядке.

К сожалению, с ними обоими трудно справиться.Чтобы иметь дело с общим случаем различных очередей, потребитель, который выключается, должен продолжать потреблять значения после получения своего стража выключения, пока очередь не станет пустой.Это означает, что он может потреблять страж другого потока.Это слабое место в интерфейсе очереди: у него должен быть вызов Queue.shutdown, чтобы все потребители вызывали исключение, но его не хватает.

На практике:

  • если вы уверены, что когда-либо используете только обычную очередь, просто отправьте по одному стражу на поток.
  • , если вы можете использовать PriorityQueue, убедитесь, что страж имеет самый низкий приоритет.
1 голос
/ 31 августа 2010

Очередь - это регистр FIFO (первым пришел - первым обслужен), поэтому помните, что потребитель может быть быстрее, чем производитель. Когда поток потребителей обнаруживает, что очередь пуста, обычно выполняют одно из следующих действий:

  1. Отправить в API: перейти к следующей теме.
  2. Отправить в API: поспать несколько мс и проверить снова очередь.
  3. Отправить в API: ожидание события (например, новое сообщение в очереди).

Если вы не хотите, чтобы поток потребителей завершал работу после завершения задания, то помещаете в очередь значение Sentinel для завершения задания.

0 голосов
/ 31 августа 2010

Лучший способ сделать это состоит в том, чтобы сама очередь уведомляла клиента о том, что она достигла состояния «выполнено».Затем клиент может предпринять любое подходящее действие.

То, что вы предложили;проверка очереди, чтобы увидеть, если это делается периодически, было бы крайне нежелательно.Опрос является антипаттерном в многопоточном программировании, вы всегда должны использовать уведомления.

РЕДАКТИРОВАТЬ:
Итак, вы говорите, что сама очередь знает, что она «выполнена» на основе некоторых критериев и должна уведомлять клиентов оэтот фактЯ думаю, что вы правы, и лучший способ сделать это - бросить, когда клиент вызывает get () и очередь находится в состоянии готово.Если ваш бросок это исключит необходимость в дозорном значении на стороне клиента.Внутренне очередь может обнаруживать, что она «выполнена» любым удобным для нее способом, например, очередь пуста, ее состояние установлено на «выполнено» и т. Д. Я не вижу необходимости в значении часового.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...