wait_for тайм-аут, чтобы прийти, asyncio.queue не работает тайм-аут - PullRequest
0 голосов
/ 15 марта 2020

Я пытался запустить следующий код. Код является примером добавления элемента в очередь из другого потока и исключения, когда происходит тайм-аут, если очередь не снабжена новыми элементами.

import asyncio
import threading
import time


async def responder(queue, value):
    value["value"] = await queue.get()


async def worker(queue):
    value = dict()
    while True:
        print("waiting")
        # new_msg = await queue.get()
        asyncio.wait_for(responder(queue, value), timeout=2)
        print(value)
        print("worked")


class wManager():
    def __init__(self):

        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        self.queue = asyncio.Queue()

        self.loop.create_task(worker(self.queue))

        self.msg = 0

    def add(self):
        print("queue Size Before"+str(self.queue.qsize()))
        asyncio.run_coroutine_threadsafe(self.queue.put(self.msg), self.loop)

        self.msg += 1
        print("queue Size After"+str(self.queue.qsize()))

    def start(self):
        self.loop.run_forever()


def print_after_sleep(manager):

    while True:
        print("hi!")
        time.sleep(3)
        manager.add()


def manager_runer(manager):
    manager.start()


if __name__ == "__main__":
    manager = wManager()
    manager_thread = threading.Thread(target=manager_runer, args=(manager,), daemon=True)

    sleepy_thread = threading.Thread(
        target=print_after_sleep, args=(manager,), daemon=True)

    sleepy_thread.start()
    manager_thread.start()
    sleepy_thread.join()

Я хочу иметь возможность тайм-аута, если очередь не получает новые элементы по истечении заданного времени. Проблема в том, что responder() не ожидается.

Спасибо

1 Ответ

1 голос
/ 15 марта 2020

Вам не хватает await перед asyncio.wait_for(). Вы также, вероятно, захотите добавить try / catch для обработки TimeoutError, вызванного вызовом в случае тайм-аута.

...