Задача Python Asyncio выполняется без собираются () - PullRequest
0 голосов
/ 30 ноября 2018

Я пытался воспроизвести и лучше понять пример TaskPool в этом блоге Кристиана Гарсиа, и я столкнулся с очень интересным результатом.

Вот два сценария, которые яиспользуемый.Я заменил фактический сетевой запрос случайным сном

#task_pool.py
import asyncio

class TaskPool(object):

    def __init__(self, workers):
        self._semaphore = asyncio.Semaphore(workers)
        self._tasks = set()

    async def put(self, coro):
        await self._semaphore.acquire()
        task = asyncio.create_task(coro)
        self._tasks.add(task)
        task.add_done_callback(self._on_task_done)

    def _on_task_done(self, task):
        self._tasks.remove(task)
        self._semaphore.release()

    async def join(self):
        await asyncio.gather(*self._tasks)

    async def __aenter__(self):
        return self

    def __aexit__(self, exc_type, exc, tb):
        print("aexit triggered")
        return self.join()

И

# main.py
import asyncio
import sys
from task_pool import TaskPool
import random
limit = 3

async def fetch(i):
    timereq = random.randrange(5)
    print("request: {} start, delay: {}".format(i, timereq))
    await asyncio.sleep(timereq)
    print("request: {} end".format(i))
    return (timereq,i)

async def _main(total_requests):
    async with TaskPool(limit) as tasks:
        for i in range(total_requests):
            await tasks.put(fetch(i))

loop = asyncio.get_event_loop()
loop.run_until_complete(_main(int(sys.argv[1])))

Команда main.py 10 на Python 3.7.1 дает следующий результат.

request: 0 start, delay: 3
request: 1 start, delay: 3
request: 2 start, delay: 3
request: 0 end
request: 1 end
request: 2 end
request: 3 start, delay: 4
request: 4 start, delay: 1
request: 5 start, delay: 0
request: 5 end
request: 6 start, delay: 1
request: 4 end
request: 6 end
request: 7 start, delay: 1
request: 8 start, delay: 4
request: 7 end
aexit triggered
request: 9 start, delay: 1
request: 9 end
request: 3 end
request: 8 end

У меня есть несколько вопросов, основанных на этом результате.

  1. Я бы не ожидал, что задачи будут выполняться, пока менеджер контекста не выйдет и не сработает __aexit__, потому что это единственный триггердля asyncio.gather.Однако заявления о печати настоятельно предполагают, что задания fetch выполняются еще до aexit.Что именно происходит?Работают ли задачи?Если так, то с чего они начались?
  2. Относится к (1).Почему диспетчер контекста завершает работу до того, как все задания возвращаются?
  3. Задание fetch должно возвращать кортеж.Как я могу получить доступ к этому значению?Я полагаю, что для веб-приложения разработчик может захотеть выполнить операции с данными, возвращаемыми веб-сайтом.

Любая помощь приветствуется!

1 Ответ

0 голосов
/ 30 ноября 2018
  1. Задание запускается, как только вызывается create_task.

    Прямо из документации, первая строка:

    Заверните сопрограмму coro в задание и запланируйте его выполнение.

  2. это не должно, но.Посмотрите на код в вашем вопросе:

    def __aexit__(self, exc_type, exc, tb):
        print("aexit triggered")
        return self.join()
    

    Есть три проблемы:

    • Это обычная синхронная функция.Измените его на async def и добавьте обязательное await для вызова self.join().Здесь вы не вызываете join, вы просто создаете задачу, но никогда не запускаете ее.Ваш питон наверняка жалуется на то, что вы никогда не ожидаете задания. Эти предупреждения никогда не следует игнорировать , потому что они означают, что в вашей программе что-то идет не так.

      [edit:] , как указано пользователем 4815162342 ниже, конструкция, которую вы написалина самом деле будет работать, хотя, вероятно, не по намеченным причинам - это работает, потому что функция сопрограммы, возвращаемая путем вызова self.join(), не ожидая ее, будет возвращена и использована, как если бы она была собственной.Вы не хотите этого, сделайте его асинхронным и ожидайте.

    • Как только это будет исправлено, __aexit__ выведет «aexit triggered» и затем вызовет join, который ожидает выполнения задач.Поэтому сообщения от задач, еще не выполненных, появятся после сообщения «aexit triggered».

    • Возвращаемое значение __aexit__ игнорируется, если только выход не произошел из-за возникновения исключения.В этом случае return True проглотит исключение.Отбросьте return

    Таким образом, фиксированная часть:

    async def __aexit__(self, exc_type, exc, tb):
        print("aexit triggered")
        await self.join()
        print("aexit completed")
    
  3. Ваш TaskPool должен получить результатзадачи доступны.Это ваш дизайн, Python не будет творить магию под капотом.Из того, что у вас есть, простым способом было бы join сохранить результат gather в качестве атрибута пула задач.

...