Очередь Python, ожидающая потока, прежде чем получить следующий элемент - PullRequest
2 голосов
/ 18 мая 2011

У меня есть очередь, которая всегда должна быть готова обрабатывать элементы, когда они добавляются в нее.Функция, которая запускается для каждого элемента в очереди, создает и запускает поток для выполнения операции в фоновом режиме, чтобы программа могла выполнять другие действия.

Однако функция, которую я вызываю для каждого элемента в очереди, просто запускает поток и затем завершает выполнение, независимо от того, завершен ли поток, который он начал.Из-за этого цикл переходит к следующему элементу в очереди, прежде чем программа завершит обработку последнего элемента.

Вот код, чтобы лучше продемонстрировать, что я пытаюсь сделать:

queue = Queue.Queue()
t = threading.Thread(target=worker)
t.start()

def addTask():
    queue.put(SomeObject())

def worker():
    while True:
        try:
            # If an item is put onto the queue, immediately execute it (unless 
            # an item on the queue is still being processed, in which case wait 
            # for it to complete before moving on to the next item in the queue)
            item = queue.get()
            runTests(item)
            # I want to wait for 'runTests' to complete before moving past this point
        except Queue.Empty, err:
            # If the queue is empty, just keep running the loop until something 
            # is put on top of it.
            pass

def runTests(args):
    op_thread = SomeThread(args)
    op_thread.start()
    # My problem is once this last line 't.start()' starts the thread, 
    # the 'runTests' function completes operation, but the operation executed
    # by some thread is not yet done executing because it is still running in
    # the background. I do not want the 'runTests' function to actually complete
    # execution until the operation in thread t is done executing.
    """t.join()"""
    # I tried putting this line after 't.start()', but that did not solve anything.
    # I have commented it out because it is not necessary to demonstrate what 
    # I am trying to do, but I just wanted to show that I tried it.

Некоторые примечания:

Это все работает в приложении PyGTK.Когда операция SomeThread завершена, она отправляет обратный вызов в графический интерфейс для отображения результатов операции.

Я не знаю, насколько это влияет на мою проблему, но я подумал, что это может бытьважно.

1 Ответ

7 голосов
/ 18 мая 2011

Основная проблема с потоками Python заключается в том, что вы не можете просто убить их - они должны согласиться на смерть .

Что вы должны сделать:

  1. Реализация потока как класса
  2. Добавление члена threading.Event, который очищается методом join и периодически проверяет основной цикл потока.Если он видит, что он очищен, он возвращается.Для этого переопределения threading.Thread.join, чтобы проверить событие, а затем вызвать Thread.join для самого себя
  3. Чтобы разрешить (2), сделать блок чтения из Queue с небольшим тайм-аутом.Таким образом, «время отклика» вашего потока на запрос на удаление будет тайм-аутом, и OTOH не происходит удушение процессора

Вот некоторый код из потока клиента сокета, который у меня есть, с той же проблемой блокировкив очереди:

class SocketClientThread(threading.Thread):
    """ Implements the threading.Thread interface (start, join, etc.) and
        can be controlled via the cmd_q Queue attribute. Replies are placed in
        the reply_q Queue attribute.
    """
    def __init__(self, cmd_q=Queue.Queue(), reply_q=Queue.Queue()):
        super(SocketClientThread, self).__init__()
        self.cmd_q = cmd_q
        self.reply_q = reply_q
        self.alive = threading.Event()
        self.alive.set()
        self.socket = None

        self.handlers = {
            ClientCommand.CONNECT: self._handle_CONNECT,
            ClientCommand.CLOSE: self._handle_CLOSE,
            ClientCommand.SEND: self._handle_SEND,
            ClientCommand.RECEIVE: self._handle_RECEIVE,
        }

    def run(self):
        while self.alive.isSet():
            try:
                # Queue.get with timeout to allow checking self.alive
                cmd = self.cmd_q.get(True, 0.1)
                self.handlers[cmd.type](cmd)
            except Queue.Empty as e:
                continue

    def join(self, timeout=None):
        self.alive.clear()
        threading.Thread.join(self, timeout)

Примечание self.alive и цикл в run.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...