Как перезапустить поток asyncio? - PullRequest
0 голосов
/ 07 мая 2020

Как я могу перезапустить asyncio l oop? Я слушаю веб-сокет с asyncio. Я хочу перестать слушать и перезапустить весь l oop. Как я могу это сделать? Моя попытка ниже не работает

async def start_websocket(streams):
    print("using streams {}".format(streams))
    await asyncio.sleep(30)


def _start_loop(loop, ws):
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(ws)
    except CancelledError:
        pass


for streams in ["a", "b"]:
    ws = start_websocket(streams)  # coroutine
    loop = asyncio.get_event_loop()

    # in case of already running, cancel websocket
    if loop.is_running():
        [t.cancel() for t in asyncio.Task.all_tasks()]

    # restart websocket
    Thread(target=_start_loop, args=(loop, ws)).start()
    time.sleep(2)

Я получаю

RuntimeError: This event loop is already running

Ответы [ 3 ]

0 голосов
/ 07 мая 2020
    print("using streams {}".format(streams))
    await asyncio.sleep(30)


def _start_loop(loop, ws):
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(ws)
    except CancelledError:
        pass


for streams in ["a", "b"]:
    ws = start_websocket(streams)  # coroutine
    loop = asyncio.get_event_loop()

    # in case of already running, cancel websocket
    if loop.is_running():
        [t.cancel() for t in asyncio.Task.all_tasks()]
    try:
      loop.close() #docs say "The loop must not be running when this function is called. "
    except:
      loop.stop() 
    # restart websocket
    Thread(target=_start_loop, args=(loop, ws)).start()
    time.sleep(2)

Это должно исправить ранее полученную ошибку

0 голосов
/ 07 мая 2020

async def subtask(s):
    while True:
        print("running task {}".format(s))
        await asyncio.sleep(5)

async def start_websocket(streams):
    print("using streams {}".format(streams))
    asyncio.ensure_future(subtask(streams+"1"))
    asyncio.ensure_future(subtask(streams+"2"))
    try:
        while True:
            print("running task {}".format(streams))
            await asyncio.sleep(5)
    except CancelledError:
        print("cancelled task {}".format(streams))

def _start_loop(loop):

    asyncio.set_event_loop(loop)
    loop.run_forever()


for streams in ["a", "b", "c"]:
    loop = asyncio.get_event_loop()
    if not loop.is_running():
        Thread(target=_start_loop, args=(loop,)).start()
    else:
        for t in asyncio.Task.all_tasks():
            t.cancel()

    loop.create_task(start_websocket(streams))
    time.sleep(10)
    print("finished {}".format(streams))

time.sleep(60)
0 голосов
/ 07 мая 2020

Вы потенциально можете остановить l oop (loop.stop() или loop.close()) и создать новый через loop = asyncio.new_event_loop()

Другой вариант - создать настраиваемую политику l oop события : L oop Полис

...