Python asyncio не показывает ошибок - PullRequest
1 голос
/ 06 января 2020

Я пытаюсь получить некоторые данные из тысяч URL-адресов с помощью asyncio. Вот краткий обзор дизайна:

  1. Заполните Queue в одном go кучей URL, используя один Producer
  2. Создайте кучу Consumers
  3. Каждый Consumer продолжает асинхронно извлекать URL-адреса из Queue и отправлять GET запросов
  4. Выполнить некоторую постобработку для результата
  5. Объединить все обработанные результаты и вернуть

Проблемы: asyncio почти никогда не показывает, если что-то не так, просто молча зависает без ошибок. Я помещал print заявления везде, чтобы самому обнаруживать проблемы, но это мало помогало.

В зависимости от количества входных URL-адресов и количества потребителей или ограничений, я мог бы получить эти ошибки:

  1. Task was destroyed but it is pending!
  2. task exception was never retrieved future: <Task finished coro=<consumer()
  3. aiohttp.client_exceptions.ServerDisconnectedError
  4. aiohttp.client_exceptions.ClientOSError: [WinError 10053] An established connection was aborted by the software in your host machine

Вопросы: как обнаружить и обработать исключения в asyncio? как повторить попытку, не нарушая Queue?

Ниже приведен мой код, который я скомпилировал, рассматривая различные примеры асин c кода. В настоящее время имеется преднамеренная ошибка в конце функции def get_video_title. При запуске ничего не появляется.

import asyncio
import aiohttp
import json
import re
import nest_asyncio
nest_asyncio.apply() # jupyter notebook throws errors without this


user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"

def get_video_title(data):
    match = re.search(r'window\[["\']ytInitialPlayerResponse["\']\]\s*=\s*(.*)', data)
    string = match[1].strip()[:-1]
    result = json.loads(string)
    return result['videoDetails']['TEST_ERROR'] # <---- should be 'title'

async def fetch(session, url, c):
    async with session.get(url, headers={"user-agent": user_agent}, raise_for_status=True, timeout=60) as r:
        print('---------Fetching', c)
        if r.status != 200:
            r.raise_for_status()
        return await r.text()

async def consumer(queue, session, responses):
    while True:
        try:
            i, url = await queue.get()
            print("Fetching from a queue", i)
            html_page = await fetch(session, url, i)

            print('+++Processing', i)
            result = get_video_title(html_page) # should raise an error here!
            responses.append(result)
            queue.task_done()

            print('+++Task Done', i)

        except (aiohttp.http_exceptions.HttpProcessingError, asyncio.TimeoutError) as e:
            print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Error', i, type(e))
            await asyncio.sleep(1)
            queue.task_done()

async def produce(queue, urls):
    for i, url in enumerate(urls):
        print('Putting in a queue', i)
        await queue.put((i, url))

async def run(session, urls, consumer_num):
    queue, responses = asyncio.Queue(maxsize=2000), []

    print('[Making Consumers]')
    consumers = [asyncio.ensure_future(
        consumer(queue, session, responses)) 
                 for _ in range(consumer_num)]

    print('[Making Producer]')
    producer = await produce(queue=queue, urls=urls)

    print('[Joining queue]')
    await queue.join()

    print('[Cancelling]')
    for consumer_future in consumers:
        consumer_future.cancel()

    print('[Returning results]')
    return responses

async def main(loop, urls):
    print('Starting a Session')
    async with aiohttp.ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=300)) as session:
        print('Calling main function')
        posts = await run(session, urls, 100)
        print('Done')
        return posts


if __name__ == '__main__':
    urls = ['https://www.youtube.com/watch?v=dNQs_Bef_V8'] * 100
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(loop, urls))

1 Ответ

1 голос
/ 07 января 2020

Проблема в том, что ваш consumer ловит только два очень определенных c исключения и в их случае помечает задачу как выполненную. Если произойдет любое другое исключение, например, связанное с сетью, оно прекратит работу потребителя. Однако это не обнаруживается run, который ожидает queue.join() с потребителем (эффективно) работающим в фоновом режиме. Вот почему ваша программа зависает - элементы в очереди никогда не учитываются, и очередь никогда не обрабатывается полностью.

Есть два способа исправить это, в зависимости от того, что вы хотите, чтобы ваша программа делала, когда она сталкивается с неожиданным исключение. Если вы хотите, чтобы он продолжал работать, вы можете добавить для потребителя предложение catch-all except, например:

        except Exception as e
            print('other error', e)
            queue.task_done()

Альтернативой является исключение необработанное потребителя для распространения. до run. Это должно быть организовано в явном виде, но имеет то преимущество, что никогда не допускает прохождения исключений без уведомления. (См. в этой статье для получения подробной информации о предмете.) Один из способов добиться этого - дождаться queue.join() и потребителей одновременно; так как потребители находятся в бесконечном l oop, они будут завершены только в случае исключения.

    print('[Joining queue]')
    # wait for either `queue.join()` to complete or a consumer to raise
    done, _ = await asyncio.wait([queue.join(), *consumers],
                                 return_when=asyncio.FIRST_COMPLETED)
    consumers_raised = set(done) & set(consumers)
    if consumers_raised:
        await consumers_raised.pop()  # propagate the exception

Вопросы: как обнаружить и обработать исключения в asyncio?

Исключения распространяются через await и обычно обнаруживаются и обрабатываются, как и в любом другом коде. Специальная обработка необходима только для обнаружения исключений, которые вытекают из «фоновой» задачи, такой как consumer.

, как повторить попытку, не нарушая очередь?

Вы можно звонить await queue.put((i, url)) в блоке except. Элемент будет добавлен в конец очереди, чтобы быть выбранным потребителем. В этом случае вам нужен только первый фрагмент кода, и вы не захотите пытаться распространить исключение в consumer на run.

...