Python asyncio: как безопасно закрыть цикл событий после Task.cancel среди потоков? - PullRequest
0 голосов
/ 18 декабря 2018

ВОПРОС

в моем собственном проекте я запускаю цикл обработки событий в потоке, затем отменяю задачу и безопасно закрываю цикл в другом потоке.но мне не удалось.

после прочтения объекта задачи , я до сих пор не могу понять, как ожидать, что задача действительно отменена после Task.cancel

Версия Python: 3.7.1

ОС: Windows

ниже - мой процесс отладки.

import threading
import asyncio
import time

async def visit_sth():
    print("start sleep")
    await asyncio.sleep(3)
    print("end sleep")


class Person(object):

    def __init__(self):
        self.loop = asyncio.new_event_loop()

    def visit(self):

        asyncio.set_event_loop(self.loop)
        try:
            self.loop.run_until_complete(visit_sth())
        except Exception as err:
            print(err)

    def pause(self):

        tasks = asyncio.all_tasks(loop=self.loop)
        for t in tasks:
            t.cancel()

        self.loop.stop()
        self.loop.close()

P = Person()
t1 = threading.Thread(target=P.visit)
t2 = threading.Thread(target=P.pause)

t1.start()
time.sleep(0.5)
t2.start()

t1.join()
t2.join()

ошибки ниже

start sleep
Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python3701\lib\threading.py", line 917, in _bootstrap_inner
    self.run()
  File "C:\Python3701\lib\threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "c:\Users\zhouxin\Documents\jupyterlab\learning_asyncio\starkoverflow.py", line 31, in pause
    self.loop.close()
  File "C:\Python3701\lib\asyncio\selector_events.py", line 94, in close
    raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop

после отмены цикл обработки событий все еще работает.

также, документ объект задачи сказал Task.cancel () не гарантирует, что задача будет отменена

, поэтому я перехожу кstackoverflow и читайте Убивайте задачи вместо того, чтобы ждать их , и изменяйте pause , например

def pause(self):

    tasks = asyncio.all_tasks(loop=self.loop)
    for t in tasks:
        t.cancel()
        with suppress(asyncio.CancelledError):
            self.loop.run_until_complete(t)

    self.loop.stop()
    self.loop.close()

, произошла другая ошибка

start sleep
Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python3701\lib\threading.py", line 917, in _bootstrap_inner
    self.run()
  File "C:\Python3701\lib\threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "c:\Users\zhouxin\Documents\jupyterlab\learning_asyncio\starkoverflow.py", line 31, in pause
    self.loop.run_until_complete(t)
  File "C:\Python3701\lib\asyncio\base_events.py", line 560, in run_until_complete
    self.run_forever()
  File "C:\Python3701\lib\asyncio\base_events.py", line 515, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

thisпуть не сработал.

и теперь я действительно запутался в том, как дождаться отмены задачи и затем закрыть цикл после Task.cancel.

1 Ответ

0 голосов
/ 19 декабря 2018

Есть несколько проблем с вашим кодом:

  • pause взаимодействует с циклом событий извне потока, который выполняет цикл обработки событий.Это запрещено и должно быть заменено вызовами run_coroutine_threadsafe и call_soon_threadsafe.

  • Код создает новый цикл обработки событий для бизнес-объекта.Это нежелательно, потому что сильный набор asyncio состоит в том, что он допускает множество сопрограмм в одном цикле событий.

Рекомендуемый шаблон - иметь один цикл событий и отправлять ему задачи с помощьюrun_coroutine_threadsafe.Вместо того, чтобы останавливать весь цикл, когда вы хотите отменить задачу, вы просто отменяете эту конкретную задачу.Например:

import threading
import asyncio
import time

async def visit_sth():
    print("start sleep")
    await asyncio.sleep(3)
    print("end sleep")


class Person:
    def visit(self):
        # returns a concurrent.futures.Future
        self._visit_fut = asyncio.run_coroutine_threadsafe(visit_sth(), loop)
        # result() waits for the future to be resolved and returns the
        # result
        try:
            return self._visit_fut.result()
        except Exception as err:
            print(type(err), err)

    def pause(self):
        self._visit_fut.cancel()

loop = asyncio.new_event_loop()
threading.Thread(target=loop.run_forever).start()

P = Person()
t1 = threading.Thread(target=P.visit)
t2 = threading.Thread(target=P.pause)

t1.start()
time.sleep(0.5)
t2.start()

t1.join()
t2.join()
loop.call_soon_threadsafe(loop.stop)
...