Python 3.6 асинхронная блокировка aioodbc - PullRequest
0 голосов
/ 03 июля 2018

Я надеялся использовать aioodbc с асинхронным семафором для вставки строк в базу данных. Ниже будут записаны некоторые строки в базу данных назначения, но, похоже, они заблокированы вокруг Семпахора value +1. Любое предложение о том, как переработать это или решить блок / спор?

Определение таблицы:

create table async_testing (
    insert_id int null
)

Асинхронный код:

import asyncio
import aioodbc

loop = asyncio.get_event_loop()

async def odbc_insert_worker(semaphore, value, conn):
    await semaphore.acquire()
    print("Acquire Semaphore")
    async with conn.cursor() as cur:
        await cur.execute('INSERT INTO async_testing VALUES (?)', value)
    print("Release Semaphore")
    semaphore.release()

async def db_main(loop, values):
    dsn="foo"

    values = list(values)
    db_semaphore = asyncio.Semaphore(value=15)

    async with aioodbc.create_pool(dsn=dsn, loop=loop, autocommit=True) as pool:
        async with pool.acquire() as conn:
            tasks = [odbc_insert_worker(db_semaphore, value, conn) for value in values]
            await asyncio.gather(*tasks)

fmt_vals = range(0,1000)

loop.run_until_complete(db_main(loop, fmt_vals))

1 Ответ

0 голосов
/ 03 июля 2018

Благодаря помощи @jettify в канале aiolibs это решение работает:

import asyncio
import aioodbc
from concurrent.futures import ThreadPoolExecutor

loop = asyncio.get_event_loop()

async def odbc_insert_worker(conn, value):
    async with conn.cursor() as cur:
        await cur.execute('insert into async_testing values (?)', value)

async def db_main(loop, values):
    dsn="foo"

    values = list(values)

    async with aioodbc.create_pool(dsn=dsn, loop=loop, executor=ThreadPoolExecutor(max_workers=3), autocommit=True) as pool:
        tasks = [do_insert(pool, value) for value in values]
        await asyncio.gather(*tasks)

async def do_insert(pool, value):
    async with pool.acquire() as conn:
        await odbc_insert_worker(conn, value)

fmt_vals = range(0,1000)

loop.run_until_complete(db_main(loop, fmt_vals))
...