Могу ли я получить результат асинхронной 'экранированной' задачи, которая была прервана в wait_for () TimeOut - PullRequest
0 голосов
/ 04 июня 2018

Помогите мне, пожалуйста, разобраться в некоторых вещах asyncio.Я хочу понять, если это возможно сделать следующим образом:

У меня есть синхронная функция, которая, например, создает некоторые данные в удаленном API (API может возвращать успех или неудачу):

def sync_func(url):
   ... do something
   return result

У меня естьсопрограмма для запуска этой операции синхронизации в executor:

async def coro_func(url)
    loop = asyncio.get_event_loop()
    fn = functools.partial(sync_func, url)
    return await loop.run_in_executor(None, fn)

Далее я хочу сделать что-то вроде

  1. Если удаленный API не отвечает в течение 1 секунды, я хочу запустить следующий URLдля обработки, но я хочу знать результат первой задачи (когда API, наконец, отправит ответ), которая была прервана по таймауту.Я оборачиваю coro_func () в щит (), чтобы избежать его отмены.Но не представляю, как я могу проверить результат после ...

list_of_urls = [url1, ... urlN] map_of_task_results = {} async def task_processing(): for url in list_of_urls: res = asyncio.wait_for(shield(coro_func(url), timeout=1)) if res == 'success': return res break else: map_of_task_results[url] = res return "all tasks were processed"

PS Когда я пытаюсь получить доступ к результату щита (coro) - он имеетИсключение CancelledError ... но я ожидаю, что может быть результат, потому что я "экранировал" задачу.

try: task = asyncio.shield(coro_func(url)) result = await asyncio.wait_for(task, timeout=API_TIMEOUT) except TimeoutError as e: import ipdb; ipdb.set_trace() pending_tasks[api_details['api_url']] = task

ipdb> task
<Future cancelled created at 
/usr/lib/python3.6/asyncio/base_events.py:276>
ipdb> task.exception
<built-in method exception of _asyncio.Future object at 0x7f7d41eeb588>
ipdb> task.exception()

*** concurrent.futures._base.CancelledError

`

Ответы [ 2 ]

0 голосов
/ 04 июня 2018

Хорошо, спасибо @ user4815162342 Я выяснил, как обрабатывать задачи, которые были прерваны по таймауту - в общем, мое решение теперь выглядит так:

def sync_func(url):
   ... do something probably long
   return result

async def coro_func(url)
    loop = asyncio.get_event_loop()
    fn = functools.partial(sync_func, url)
    return await loop.run_in_executor(None, fn)

async def waiter(pending_tasks):
    count = 60
    while not all(map(lambda x: x.done(), pending_tasks.values())) and count > 0:
        logger.info("Waiting for pending tasks..")
        await asyncio.sleep(1)
        count -= 1

    # Finally process results those were in pending 
    print([task.result() for task in pending_tasks.values()])

async def task_processing(...):
    list_of_urls = [url1, ... urlN]
    pending_tasks = {}

    for url in list_of_urls:
        try:
            task = asyncio.Task(coro_func(url))
            result = await asyncio.wait_for(asyncio.shield(task), timeout=API_TIMEOUT)
        except TimeoutError as e:
            pending_tasks[url] = task

        if not result or result != 'success':
            continue
        else:
            print('Do something good here on first fast success, response to user ASAP in my case.')
            break

    # here start of pending task processing
    loop = asyncio.get_event_loop()
    loop.create_task(waiter(pending_tasks))

Итак, я собираю задачи, которые были прерваны одновременно.future.TimeoutError в объекте dict mapping, затем я запускаю задачу с помощью waiter () coro, который пытается подождать 60 секунд, в то время как ожидающие задачи получат статус выполненных или истечет 60 секунд.

В дополнение к словам, мой код, помещенный в RequestHandler и торнадо Tornado, использует цикл событий asyncio.Поэтому после того, как N попытается получить быстрый ответ от одного URL-адреса из списка URL-адресов, я смогу ответить пользователю и не потерять результаты задач, которые были инициированы и прерваны с помощью TimeoutError.(Я могу обработать их после того, как отвечу пользователю, так что это была моя основная идея)

Надеюсь, это сэкономит много времени для тех, кто ищет то же самое:)

0 голосов
/ 04 июня 2018

Если вы создаете будущее (задание) из своей сопрограммы перед тем, как ее защитить, вы всегда можете проверить это позже.Например:

coro_task = loop.create_task(coro_func(url))
try:
    result = await asyncio.wait_for(asyncio.shield(coro_task), API_TIMEOUT)
except asyncio.TimeoutError:
    pending_tasks[api_details['api_url']] = coro_task

Вы можете использовать coro_task.done(), чтобы проверить, выполнено ли задание за это время, и вызвать result(), если это так, или await если нет.При необходимости вы даже можете снова использовать shield / wait_for и т. Д.

...