asyncio + asyncpg + pandas: получить pandas.df с асинхронным выбором из db - ОШИБКА - PullRequest
0 голосов
/ 08 февраля 2019

Отредактировал мой код - СЕЙЧАС он работает. Я пытаюсь получить некоторую дату из моей базы данных Postgres через пул соединений asyncpg асинхронно.По сути, моя база данных содержит около 100 различных таблиц (на город), и я пытаюсь собрать все данные в один кадр как можно быстрее.

    import pandas as pd
    import asyncpg
    import asyncio
    from time import time


    def make_t():
        lst = []
        # iterator for sql tuple
        for i in ['a',
                  'b',
                  'c']:
            i1 = i
            sql = """
    SELECT
    '%s' as city,
    MAX(starttime) AS max_ts
    FROM
    "table_%s"
    """
            lst.append(sql % (i, i1))
        return tuple(lst)


    async def get_data(pool, sql):
        start = time()
        async with pool.acquire() as conn:
           stmt = await conn.prepare(sql)
           columns = [a.name for a in stmt.get_attributes()]
           data = await stmt.fetch()
           print(f'Exec time: {time() - start}')
           return pd.DataFrame(data, columns=columns)


    async def main():
        dsn = 'postgres://user:pass@127.0.0.1:5432/my_base'
        cT = ['city', 'max_ts']
        sqls = make_t()
        pool = await asyncpg.create_pool(dsn=dsn, max_size=50)
        start = time()
        tasks = []

        for sql in sqls:
            tasks.append(loop.create_task(get_data(pool, sql)))

        tasks = await asyncio.gather(*tasks)
        df = pd.DataFrame(columns=cT)
        for task in tasks:
            # form df from corutine results
            df = df.append(task.result())

        print(f'total exec time: {time() - start} secs')
        print('exiting main')
        return df


    loop = asyncio.get_event_loop()
    df = loop.run_until_complete(main())
    loop.close()

    print('exiting program')

Python 3.6.5 :: Anaconda, Inc.

Получает эту ошибку:

Traceback (последний вызов был последним): файл "", строка 319, в файле "/Users/fixx/anaconda3/lib/python3.6/asyncio/base_events.py ", строка 468, в run_until_complete, возвращает файл future.result ()" ", строка 308, в основном файле" /Users/fixx/anaconda3/lib/python3.6/asyncio/tasks.py ",строка 594, в наборе для аргумента в наборе (coros_or_futures): TypeError: unhashable тип: 'list'

Не могу понять, почему?Мои sqls в кортеже!

1 Ответ

0 голосов
/ 09 февраля 2019

asyncio.gather принимает сопрограммы в качестве отдельных аргументов, и вы отправляете ему список задач.Вы должны использовать оператор * для правильного вызова gather:

        tasks = await asyncio.gather(*tasks)
...