Завершение очереди в будущем - PullRequest
0 голосов
/ 26 ноября 2018

Я пишу веб-сервер Tornado на Python 3.7 для отображения состояния процессов, выполняемых библиотекой multiprocessing.

Следующий код работает, но я бы хотел сделать это с помощью Tornadoвстроенная библиотека вместо взлома в библиотеке потоков.Я не понял, как это сделать, не блокируя «Торнадо» во время queue.get.Я думаю, что правильное решение - обернуть вызовы get в какое-то будущее.Я пытался часами, но не понял, как это сделать.

Внутри моего многопроцессорного скрипта:

class ProcessToMonitor(multiprocessing.Process)

def __init__(self):
    multiprocessing.Process.__init__(self)
    self.queue = multiprocessing.Queue()

def run():
    while True:
        # do stuff
        self.queue.put(value)

Затем в моем скрипте Торнадо

class MyWebSocket(tornado.websocket.WebSocketHandler):
    connections = set()

    def open(self):
        self.connections.add(self)

    def close(self):
        self.connections.remove(self)

    @classmethod
    def emit(self, message):
        [client.write_message(message) for client in self.connections]

def worker():
    ptm = ProcessToMonitor()
    ptm.start()
    while True:
        message = ptm.queue.get()
        MyWebSocket.emit(message)

if __name__ == '__main__':
    app = tornado.web.Application([
        (r'/', MainHandler), # Not shown
        (r'/websocket', MyWebSocket)
    ])
    app.listen(8888)

    threading.Thread(target=worker)

    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.start()

1 Ответ

0 голосов
/ 26 ноября 2018

queue.get не является функцией блокировки, она просто ждет, пока в очереди не появится элемент, если очередь пуста.Из вашего кода я вижу, что queue.get идеально подходит для вашего случая использования внутри цикла while.

Я думаю, что вы, вероятно, используете его неправильно.Вам нужно сделать функцию worker сопрограммой (синтаксис async / await):

async def worker():
    ...
    while True:
        message = await queue.get()
        ...

Однако, если вы не хотите ждать элемента ихотел бы продолжить немедленно, его альтернатива - queue.get_nowait.

Следует отметить, что queue.get_nowait вызовет исключение под названием QueueEmpty, еслиочередь пустаИтак, вам нужно обработать это исключение.

Пример:

while True:
    try:
        message = queue.get_nowait()
    except QueueEmpty:
        # wait for some time before
        # next iteration
        # otherwise this loop will
        # keep running for no reason

    MyWebSocket.emit(message)

Как вы можете видеть, вам придется использовать цикл паузы в течение некоторого времени, если очередь пуста, чтобы предотвратить перегрузку системы.

Так почему бы не использовать queue.get во-первых?

...