Правильно ли я использую aiohttp вместе с psycopg2? - PullRequest
0 голосов
/ 03 июля 2018

Я очень новичок в использовании asyncio / aiohttp, но у меня есть скрипт Python, который считывает пакет URL: s из таблицы Postgres, загружает URL: s, запускает функцию обработки при каждой загрузке (не относится к вопрос) и сохраняет результат обработки в таблицу.

В упрощенном виде это выглядит так:

import asyncio
import psycopg2
from aiohttp import ClientSession, TCPConnector

BATCH_SIZE = 100

def _get_pgconn():
    return psycopg2.connect()

def db_conn(func):
    def _db_conn(*args, **kwargs):
        with _get_pgconn() as conn:
            with conn.cursor() as cur:
                return func(cur, *args, **kwargs)
            conn.commit()
    return _db_conn

async def run():
    async with ClientSession(connector=TCPConnector(ssl=False, limit=100)) as session:
        while True:
            count = await run_batch(session)
            if count == 0:
                break

async def run_batch(session):
    tasks = []
    for url in get_batch():
        task = asyncio.ensure_future(process_url(url, session))
        tasks.append(task)

    await asyncio.gather(*tasks)
    results = [task.result() for task in tasks]
    save_batch_result(results)
    return len(results)

async def process_url(url, session):
    try:
        async with session.get(url, timeout=15) as response:
            body = await response.read()
            return process_body(body)
    except:
        return {...}

@db_conn
def get_batch(cur):
    sql = "SELECT id, url FROM db.urls WHERE processed IS NULL LIMIT %s"
    cur.execute(sql, (BATCH_SIZE,))
    return cur.fetchall()


@db_conn
def save_batch_result(cur, results):
    sql = "UPDATE db.urls SET a = %(a)s, processed = true WHERE id = %(id)s"
    cur.executemany(sql, tuple(results))


loop = asyncio.get_event_loop()
loop.run_until_complete(run())

Но у меня такое чувство, что я здесь что-то упускаю. Сценарий выполняется, но, похоже, он становится все медленнее и медленнее с каждым пакетом. Особенно похоже, что вызов функции process_url со временем замедляется. Кроме того, используемая память продолжает расти, так что я предполагаю, что может быть что-то, что я не могу правильно очистить между запусками?

У меня также есть проблемы с увеличением размера пакета, если я превышаю 200, я, похоже, получаю гораздо большую долю исключений из вызова на session.get. Я попытался поиграть с аргументом limit для TCPConnector, установив его как выше, так и ниже, но я не вижу, что это сильно помогает. Также пытались запустить его на нескольких разных серверах, но, кажется, это то же самое. Есть ли способ подумать о том, как установить эти значения более эффективно?

Был бы признателен за некоторые указания на то, что я мог бы сделать здесь неправильно!

1 Ответ

0 голосов
/ 31 июля 2018

Проблема вашего кода заключается в смешении асинхронной aiohttp библиотеки с синхронной psycopg2 клиентом.

Как следствие, вызовы БД блокируют цикл обработки событий, полностью затрагивая все другие параллельные задачи.

Для ее решения вам необходимо использовать асинхронный клиент БД: aiopg (обертка вокруг psycopg2 асинхронный режим) или asyncpg (у него другой API, но работает быстрее).

...