Как сбросить рабочий цикл asyncio? - PullRequest
0 голосов
/ 01 января 2019

Я работаю с asyncio forever() eventloop.Теперь я хочу перезапустить цикл (остановить цикл и воссоздать новый цикл) после процесса, или сигнала, или изменения в файле, но у меня есть некоторые проблемы для этого:


Воттри упрощенных кода фрагмента, которые демонстрируют некоторый рабочий сопрограммы и перезапуск цикла сопрограммы:


# 1-я попытка:

import asyncio

async def coro_worker(proc):
    print(f'Worker: {proc} started.')
    while True:
        print(f'Worker: {proc} process.')
        await asyncio.sleep(proc)

async def reset_loop(loop):
    # Some process
    for i in range(5):  # Like a process.
        print(f'{i} counting for reset the eventloop.')
        await asyncio.sleep(1)

    main(loop)  # Expected close the current loop and start a new loop!

def main(previous_loop=None):
    offset = 0
    if previous_loop is not None:  # Trying for close the last loop if exist.
        offset = 1  # An offset to change the process name.
        for task in asyncio.Task.all_tasks():
            print('Cancel the tasks')  # Why it increase up?
            task.cancel()
            # task.clear()
            # task.close()
            # task.stop()

        print("Done cancelling tasks")
        asyncio.get_event_loop().stop()

    process = [1 + offset, 2 + offset]
    loop = asyncio.get_event_loop()
    futures = [loop.create_task(coro_worker(proc)) for proc in process]
    futures.append(loop.create_task(reset_loop(loop)))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except asyncio.CancelledError:
        print('Tasks has been canceled')
        main()  # Recursively
    finally:
        print("Closing Loop")
        loop.close()
main()

Out [1]:

Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
  File "reset_asycio.py", line 40, in main
    loop.run_forever()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "reset_asycio.py", line 17, in reset_loop
    main(loop)  # Expected close the current loop and start a new loop!
  File "reset_asycio.py", line 48, in main
    loop.close()
  File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
    super().close()
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
    raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
  main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
  main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>

# 2 попробуйте:

.
.
.

def main(previous_loop=None):
    offset = 0
    if previous_loop is not None:  # Trying for close the last loop if exist.
        previous_loop.stop()
        previous_loop.close()
        offset = 1  # An offset to change the process name.

    process = [1 + offset, 2 + offset]
    loop = asyncio.get_event_loop()
    futures = [loop.create_task(coro_worker(proc)) for proc in process]
    futures.append(loop.create_task(reset_loop(loop)))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except asyncio.CancelledError:
        print('Tasks has been canceled')
        main()  # Recursively
    finally:
        print("Closing Loop")
        loop.close()
main()

Out [2]:

Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
  File "reset_asycio.py", line 15, in reset_loop
    main(loop)  # Expected close the current loop and start new loop!
  File "reset_asycio.py", line 21, in main
    previous_loop.close()
  File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
    super().close()
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
    raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>

#3 попытки:

.
.
.

def main(previous_loop=None):
    offset = 0
    if previous_loop is not None:  # Trying for close the last loop if exist.
        offset = 1  # An offset to change the process name.
        for task in asyncio.Task.all_tasks():
            print('Cancel the tasks')  # Why it increase up?
            task.cancel()

    process = [1 + offset, 2 + offset]
    loop = asyncio.get_event_loop()
    futures = [loop.create_task(coro_worker(proc)) for proc in process]
    futures.append(loop.create_task(reset_loop(loop)))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except asyncio.CancelledError:
        print('Tasks has been canceled')
        main()  # Recursively
    finally:
        print("Closing Loop")
        loop.close()
main()

Out [3]:

Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.

Проблема:

  • В # 3 попробуйте , по-видимому, я сделал это, но print('Cancel the tasks') увеличивается после каждого перезапуска, в чем причина?!

  • Есть ли лучший подход для преодоления этой проблемы?

Простите за длинный вопрос, который я пытался упростить!


[ПРИМЕЧАНИЕ]:

  • Я неищу asyncio.timeout()
  • Я также пытался с другим потоком, чтобы перезапустить eventloop с unsбезуспешно результат.
  • Я использую Python 3.6

1 Ответ

0 голосов
/ 02 января 2019

Рекурсивный вызов main() и новый цикл обработки событий добавляют ненужные сложности.Вот более простой прототип для игры - он отслеживает внешний источник (файловую систему) и, когда файл создается, просто останавливает цикл.main() содержит цикл, который заботится как о (повторном) создании, так и об отмене задач:

import os, asyncio, random

async def monitor():
    loop = asyncio.get_event_loop()
    while True:
        if os.path.exists('reset'):
            print('reset!')
            os.unlink('reset')
            loop.stop()
        await asyncio.sleep(1)

async def work(workid):
    while True:
        t = random.random()
        print(workid, 'sleeping for', t)
        await asyncio.sleep(t)

def main():
    loop = asyncio.get_event_loop()
    loop.create_task(monitor())
    offset = 0
    while True:
        workers = []
        workers.append(loop.create_task(work(offset + 1)))
        workers.append(loop.create_task(work(offset + 2)))
        workers.append(loop.create_task(work(offset + 3)))
        loop.run_forever()
        for t in workers:
            t.cancel()
        offset += 3

if __name__ == '__main__':
    main()

Другой вариант - никогда не останавливать цикл событий, а просто запускать событие сброса:

async def monitor(evt):
    while True:
        if os.path.exists('reset'):
            print('reset!')
            os.unlink('reset')
            evt.set()
        await asyncio.sleep(1)

В этом дизайне main() может быть сопрограммой:

async def main():
    loop = asyncio.get_event_loop()
    reset_evt = asyncio.Event()
    loop.create_task(monitor(reset_evt))
    offset = 0
    while True:
        workers = []
        workers.append(loop.create_task(work(offset + 1)))
        workers.append(loop.create_task(work(offset + 2)))
        workers.append(loop.create_task(work(offset + 3)))
        await reset_evt.wait()
        reset_evt.clear()
        for t in workers:
            t.cancel()
        offset += 3

if __name__ == '__main__':
    asyncio.run(main())
    # or asyncio.get_event_loop().run_until_complete(main())

Обратите внимание, что в обоих вариантах отмена задач осуществляется с помощью await, вызывающим исключение CancelledError.Задача не должна перехватывать все исключения, используя try: ... except: ..., и, если это так, должна повторно вызвать исключение.

...