Исключение задачи с асинхронным запросом aiohttp - PullRequest
1 голос
/ 03 мая 2019

Я пытаюсь ускорить несколько запросов на получение к веб-службе, используя asyncio и aiohttp.

Для этого я извлекаю свои данные из базы данных postgresql с помощью модуля psycopg2 .fetchmany () внутри функции и создаю словарь из 100 записей для отправки в виде списков URL-адресов словаря в асинхронную функцию с именем batch (). партия за процессом.

Проблема, с которой я сталкиваюсь в функции batch (), заключается в том, что некоторые запросы регистрируют сообщение, указанное ниже, хотя сценарий продолжается и не завершается с ошибкой, но я не могу перехватить и зарегистрировать эти исключения для последующей их повторной обработки.

Task exception was never retrieved
future: <Task finished coro=<batch.<locals>.fetch() done, defined at C:/PythonProjects/bindings/batch_fetch.py:34> exception=ClientOSError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)>
Traceback (most recent call last):
  File "C:/PythonProjects/bindings/batch_fetch.py", line 36, in fetch
    async with session.get(url) as resp:
  File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 1005, in __aenter__
    self._resp = await self._coro
  File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 497, in _request
    await resp.start(conn)
  File "C:\Miniconda3\lib\site-packages\aiohttp\client_reqrep.py", line 844, in start
    message, payload = await self._protocol.read()  # type: ignore  # noqa
  File "C:\Miniconda3\lib\site-packages\aiohttp\streams.py", line 588, in read
    await self._waiter
aiohttp.client_exceptions.ClientOSError: [WinError 10054] An existing connection was forcibly closed by the remote host
Task exception was never retrieved
future: <Task finished coro=<batch.<locals>.fetch() done, defined at C:/PythonProjects/bindings/batch_fetch.py:34> exception=ClientConnectorError(10060, "Connect call failed ('xx.xxx.xx.xxx', 80)")>
Traceback (most recent call last):
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 924, in _wrap_create_connection
    await self._loop.create_connection(*args, **kwargs))
  File "C:\Miniconda3\lib\asyncio\base_events.py", line 778, in create_connection
    raise exceptions[0]
  File "C:\Miniconda3\lib\asyncio\base_events.py", line 765, in create_connection
    yield from self.sock_connect(sock, address)
  File "C:\Miniconda3\lib\asyncio\selector_events.py", line 450, in sock_connect
    return (yield from fut)
  File "C:\Miniconda3\lib\asyncio\selector_events.py", line 480, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 10060] Connect call failed ('xx.xxx.xx.xxx', 80)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:/PythonProjects/bindings/batch_fetch.py", line 36, in fetch
    async with session.get(url) as resp:
  File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 1005, in __aenter__
    self._resp = await self._coro
  File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 476, in _request
    timeout=real_timeout
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 522, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 854, in _create_connection
    req, traces, timeout)
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 992, in _create_direct_connection
    raise last_exc
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 974, in _create_direct_connection
    req=req, client_error=client_error)
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 931, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host cms-uat.cme.in.here.com:80 ssl:None [Connect call failed ('xx.xxx.xx.xxx', 80)]

Я просто вхожу в асинхронный мир, как вы можете изобразить из моего кода, так что все советы по подходу полного кода для этого сценария очень приветствуются.

Спасибо

полный код ниже.

import psycopg2.extras
import asyncio
import json
from aiohttp import ClientSession
from aiohttp import TCPConnector

base_url = 'http://url-example/{}'

def query_db():
    urls = []
    # connection to postgres table , fetch data.
    conn = psycopg2.connect("dbname='pac' user='user' host='db'")
    cursor = conn.cursor('psycopg2 request', cursor_factory=psycopg2.extras.NamedTupleCursor)
    sql = "select gid, paid from table"
    cursor.execute(sql)
    while True:
        rec = cursor.fetchmany(100)

        for item in rec:
            record = {"gid": item.gid, "url": base_url.format(item.paid)}
            urls.append(record.get('url'))
        if not rec:
            break
        # send batch for async batch request
        batch(urls)
        # empty list of urls for new async batch request
        urls = []


def batch(urls):
    async def fetch(url):
        async with ClientSession() as session:
            async with session.get(url) as resp:
                if resp.status == 200:
                    response = await resp.json()
                    # parse the url to fetch the point address id.
                    paid = str(resp.request_info.url).split('/')[4].split('?')[0]
                    # build the dictionary with pa id and full response.
                    resp_dict = {'paid': paid, 'response': response}
                    with open('sucessful.json', 'a') as json_file:
                        json.dump(resp_dict, json_file)
                        json_file.write("\n")
                elif resp.status is None:
                    print(resp.status)
                elif resp.status != 200:
                    print(resp.status)
                    response = await resp.json()
                    # parse the url to fetch the paid.
                    paid = str(resp.request_info.url).split('/')[4].split('?')[0]
                    # build the dictionary with paid and full response.
                    resp_dict = {'paid': paid, 'response': response}
                    with open('failed.json', 'a') as json_file:
                        json.dump(resp_dict, json_file)
                        json_file.write("\n")

    loop = asyncio.get_event_loop()

    tasks = []

    for url in urls:
        task = asyncio.ensure_future(fetch(url))
        tasks.append(task)
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except Exception:
        print("exception consumed")


if __name__ == "__main__":
    query_db()

1 Ответ

1 голос
/ 03 мая 2019

Исключение задачи не было получено

Вы видите это предупреждение, когда вы создали какое-то задание, оно завершилось с исключением, но вы никогда явно не получали (ожидали) его результат. Вот связанный раздел документа .

Бьюсь об заклад, в вашем случае проблема с линией

loop.run_until_complete(asyncio.wait(tasks))

asyncio.wait() по умолчанию просто ждет, когда все tasks будут выполнены. Он не различает задачи, выполненные нормально или за исключением, он просто блокирует, пока все не закончится. В этом случае ваша задача - извлекать исключения из завершенных задач, и следующая часть вам не поможет, поскольку asyncio.wait() никогда не вызовет ошибку:

try:
    loop.run_until_complete(asyncio.wait(tasks))
except Exception:
    print('...')  # You will probably NEVER see this message

Если вы хотите поймать ошибку, как только она произошла в одной из задач, я могу посоветовать вам использовать asyncio.gather(). По умолчанию это вызовет первое возникшее исключение. Тем не менее, обратите внимание, что именно вы должны отменить отложенные задачи, если вы хотите их постепенного завершения.

...