Получить все предметы из очереди сообщений - PullRequest
14 голосов
/ 01 октября 2008

У меня есть один поток, который записывает результаты в очередь.

В другом потоке (GUI) я периодически (в событии IDLE) проверяю, есть ли результаты в очереди, например:

def queue_get_all(q):
    items = []
    while 1:
        try:
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

Это хороший способ сделать это?

Edit:

Я спрашиваю, потому что иногда ожидающий поток застревает на несколько секунд, не вынимая новый Результаты.

Проблема «зависания» оказалась из-за того, что я выполнял обработку в обработчике неактивных событий, не убедившись, что такие события действительно генерируются путем вызова wx.WakeUpIdle, как рекомендуется.

Ответы [ 6 ]

15 голосов
/ 01 октября 2008

Если вы всегда вытаскиваете все доступные элементы из очереди, есть ли смысл использовать очередь, а не просто список с блокировкой? то есть:

from __future__ import with_statement
import threading

class ItemStore(object):
    def __init__(self):
        self.lock = threading.Lock()
        self.items = []

    def add(self, item):
        with self.lock:
            self.items.append(item)

    def getAll(self):
        with self.lock:
            items, self.items = self.items, []
        return items

Если вы также извлекаете их по отдельности и используете блокирующее поведение для пустых очередей, вам следует использовать Очередь, но ваш вариант использования выглядит намного проще и может быть лучше реализован с помощью вышеуказанного подхода.

[Edit2] Я пропустил тот факт, что вы опрашиваете очередь из цикла простоя, и из вашего обновления я вижу, что проблема не связана с конфликтом, поэтому ниже подход не имеет отношения к вашей проблеме. Я оставил это в случае, если кто-нибудь найдет вариант блокировки этого полезного:

В тех случаях, когда вы хотите заблокировать, пока не получите хотя бы один результат, вы можете изменить приведенный выше код, чтобы дождаться, когда данные станут доступны благодаря сигналу потока производителя. Например.

class ItemStore(object):
    def __init__(self):
        self.cond = threading.Condition()
        self.items = []

    def add(self, item):
        with self.cond:
            self.items.append(item)
            self.cond.notify() # Wake 1 thread waiting on cond (if any)

    def getAll(self, blocking=False):
        with self.cond:
            # If blocking is true, always return at least 1 item
            while blocking and len(self.items) == 0:
                self.cond.wait()
            items, self.items = self.items, []
        return items
10 голосов
/ 10 сентября 2014

Я думаю, что самый простой способ вывести все элементы из очереди - это:

def get_all_queue_result(queue):

    result_list = []
    while not queue.empty():
        result_list.append(queue.get())

    return result_list
9 голосов
/ 01 октября 2008

Я был бы очень удивлен, если бы вызов get_nowait() вызвал паузу, не возвращаясь, если список был пуст.

Может ли быть так, что вы публикуете большое количество (может быть, больших?) Элементов между проверками, что означает, что у принимающего потока есть большой объем данных для извлечения из Queue? Вы можете попробовать ограничить число, которое вы получаете в одной партии:

def queue_get_all(q):
    items = []
    maxItemsToRetreive = 10
    for numOfItemsRetrieved in range(0, maxItemsToRetreive):
        try:
            if numOfItemsRetrieved == maxItemsToRetreive:
                break
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

Это будет ограничивать получение потока до 10 пунктов одновременно.

1 голос
/ 18 февраля 2015

Если вы закончили запись в очередь, qsize должен выполнить свою задачу без необходимости проверять очередь для каждой итерации.

responseList = []
for items in range(0, q.qsize()):
    responseList.append(q.get_nowait())
1 голос
/ 01 октября 2008

Я вижу, что вы используете get_nowait (), который в соответствии с документацией "возвращает [s] элемент, если он сразу доступен, иначе выдает исключение Пусто"

Теперь вы можете выйти из цикла, когда выдается исключение Empty. Таким образом, если в очереди нет немедленного результата, ваша функция возвращает пустой список элементов.

Есть ли причина, по которой вы не используете метод get ()? Может случиться так, что get_nowait () завершится неудачно, потому что очередь обслуживает запрос put () в тот же момент.

0 голосов
/ 04 апреля 2019

Самый простой метод - использование понимания списка:

items = [q.get() for _ in range(q.qsize())]

Использование функции range обычно не одобряется, но я пока не нашел более простой способ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...