Ограничить количество подключений к базе данных Postgres с использованием asyncpg? - PullRequest
1 голос
/ 22 февраля 2020

Я пытаюсь использовать asyncio, aiohttp, asyncpg для выполнения асинхронных запросов, анализа полученных данных и последующей записи данных в мою базу данных Postgres. В моей базе данных Postgres максимальное_соединение равно 100, и я запрашиваю информацию из примерно 8000 URL-адресов, каждый из которых затем запускает несколько операторов SQL через соединение с базой данных postgres.

Как установить ограничение на количество подключений? Я посмотрел на asyncpg.create_pool(), но не знаю, хочу ли я этого.

Мой код для подключения к моей базе данных и выполнения операторов:

# Create coroutine which creates a pg connection awaiting the parsing of json data
async def pg_write(sem: Semaphore, url: str, session: ClientSession, **kwargs) -> None:
    # create a connection to a postgres database 'streamdata'
    conn = await asyncpg.connect('postgresql://username:password@localhost:5432/streamdata')
    # try creating a table to store our data
    try:
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS streams(
                id serial PRIMARY KEY,
                station_num text NOT NULL,
                value real NOT NULL,
                obs_date date NOT NULL,
                qualifier text,
                UNIQUE (station_num, obs_date)
            )
        """)
    except UniqueViolationError:
        pass
    # create data variable with type tuple which awaits the completion of the coroutine parse_json()
    data = await(parse_json(sem=sem, url=url, session=session, **kwargs))
    # insert variables from data tuple into the created table
    for v in data[1]:
        obs_date = v['dateTime']
        obs_date_parse = datetime.datetime.strptime(obs_date[:10], '%Y-%m-%d')
        try:
            await conn.execute("""
                INSERT INTO streams(station_num, value, obs_date, qualifier) VALUES($1, $2, $3, $4)
            """, str(data[0]), float(v['value']), obs_date_parse, str(v['qualifiers']))
        except UniqueViolationError:
            await conn.execute("""
                UPDATE streams SET value = $1, qualifier = $2
                WHERE station_num = $3 AND obs_date = $4
            """, float(v['value']), str(v['qualifiers']), str(data[0]), obs_date_parse)
    print(f"Inserted data from station {data[0]}")
    # close connection
    await conn.close()


# Create wrapper which creates a ClientSession() object and a Semaphore() object and calls a task for each url supplied
async def main(urls: list, **kwargs) -> None:
    sem = asyncio.Semaphore(800)  # numbers tested: 1000 (3 errors resulted), 800 (0 errors resulted)
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                pg_write(sem=sem, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)
...