Потоки Python, как Event и Queue работают вместе? - PullRequest
0 голосов
/ 29 августа 2018

Я разговаривал с моим другом, посмотрев на пример из книги Бизли

class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()

    def send(self, msg):
        self._mailbox.put(msg)

    def recv(self):
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg

    def close(self):
        self.send(ActorExit)

    def start(self):
        self._terminated = Event()
        t = Thread(target=self._bootstrap)
        t.daemon = True
        t.start()

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()

    def join(self):
        self._terminated.wait()

    def run(self):
        while True:
            msg = self.recv()

class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)

Мой друг утверждает, что единственной целью Event является блокировка основного потока, пока другой поток не выполнит операцию set. Это правда? Как мы можем наблюдать за выполнением потока?

1 Ответ

0 голосов
/ 30 августа 2018

Темы Python, как Event и Queue работают вместе?

Они не. Вы можете использовать События без очередей и очереди без Событий, нет зависимости друг от друга. Ваш пример просто использует оба.

Мой друг утверждает, что единственной целью Event является блокировка основного потока, пока другой поток не выполнит операцию set. Это правда?

Вызов .wait() объекта-события заблокирует любой вызывающий поток, пока внутренний флаг не станет .set().

Если вы посмотрите на источник для Event, вы обнаружите, что Events просто состоят из переменной Condition с блокировкой и логического флага + методов для обработки и передачи (ожидающим потокам) изменений состояния этого флага.

class Event:

    """Class implementing event objects.
    Events manage a flag that can be set to true with the set() method and reset
    to false with the clear() method. The wait() method blocks until the flag is
    true.  The flag is initially false.
    """

    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = False
    ...

Как мы можем наблюдать за выполнением потока?

Простой метод - применить какую-то служебную функцию, которая выводит то, что вас интересует, например:

def print_info(info=""):
    """Print calling function's name and thread with optional info-text."""
    calling_func = sys._getframe(1).f_code.co_name
    thread_name = threading.current_thread().getName()
    print(f"<{thread_name}, {calling_func}> {info}", flush=True)

Другой возможностью было бы использовать ведение журнала, как в этом ответе .


Не уверен, что Бизли хотел продемонстрировать с помощью кода, который вы показали, но он кажется мне немного перегруженным для этой простой задачи. Вовлечение событий здесь сверху не требуется, когда вы уже используете очередь. Вы можете инициализировать завершение потока, передавая значение Sentinel.

Вот упрощенная версия вашего примера с часовым ('STOP') и некоторыми инфо-принтами с print_info сверху:

import sys
import time
import threading
from queue import Queue


class Actor(threading.Thread):

    def __init__(self):
        super().__init__(target=self.run)
        self.queue = Queue()

    def send(self, msg):
        self.queue.put(msg)
        print_info(f"sent: {msg}")  # DEBUG

    def close(self):
        print_info()  # DEBUG
        self.send('STOP')

    def run(self):
        for msg in iter(self.queue.get, 'STOP'):
            pass


class PrintActor(Actor):
    def run(self):
        for msg in iter(self.queue.get, 'STOP'):
            print_info(f"got: {msg}")  # DEBUG


if __name__ == '__main__':

    pa = PrintActor()
    pa.start()
    pa.send("Hello")
    time.sleep(2)
    pa.send("...World!")
    time.sleep(2)
    pa.close()
    pa.join()

Выход:

<MainThread, send> sent: Hello
<Thread-1, run> got: Hello
<MainThread, send> sent: ...World!
<Thread-1, run> got: ...World!
<MainThread, close> 
<MainThread, send> sent: STOP
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...