Я пытаюсь использовать asyncio, aiohttp, asyncpg для выполнения асинхронных запросов, анализа полученных данных и последующей записи данных в мою базу данных Postgres. В моей базе данных Postgres максимальное_соединение равно 100, и я запрашиваю информацию из примерно 8000 URL-адресов, каждый из которых затем запускает несколько операторов SQL через соединение с базой данных postgres.
Как установить ограничение на количество подключений? Я посмотрел на asyncpg.create_pool()
, но не знаю, хочу ли я этого.
Мой код для подключения к моей базе данных и выполнения операторов:
# Create coroutine which creates a pg connection awaiting the parsing of json data
async def pg_write(sem: Semaphore, url: str, session: ClientSession, **kwargs) -> None:
# create a connection to a postgres database 'streamdata'
conn = await asyncpg.connect('postgresql://username:password@localhost:5432/streamdata')
# try creating a table to store our data
try:
await conn.execute("""
CREATE TABLE IF NOT EXISTS streams(
id serial PRIMARY KEY,
station_num text NOT NULL,
value real NOT NULL,
obs_date date NOT NULL,
qualifier text,
UNIQUE (station_num, obs_date)
)
""")
except UniqueViolationError:
pass
# create data variable with type tuple which awaits the completion of the coroutine parse_json()
data = await(parse_json(sem=sem, url=url, session=session, **kwargs))
# insert variables from data tuple into the created table
for v in data[1]:
obs_date = v['dateTime']
obs_date_parse = datetime.datetime.strptime(obs_date[:10], '%Y-%m-%d')
try:
await conn.execute("""
INSERT INTO streams(station_num, value, obs_date, qualifier) VALUES($1, $2, $3, $4)
""", str(data[0]), float(v['value']), obs_date_parse, str(v['qualifiers']))
except UniqueViolationError:
await conn.execute("""
UPDATE streams SET value = $1, qualifier = $2
WHERE station_num = $3 AND obs_date = $4
""", float(v['value']), str(v['qualifiers']), str(data[0]), obs_date_parse)
print(f"Inserted data from station {data[0]}")
# close connection
await conn.close()
# Create wrapper which creates a ClientSession() object and a Semaphore() object and calls a task for each url supplied
async def main(urls: list, **kwargs) -> None:
sem = asyncio.Semaphore(800) # numbers tested: 1000 (3 errors resulted), 800 (0 errors resulted)
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
pg_write(sem=sem, url=url, session=session, **kwargs)
)
await asyncio.gather(*tasks)