Как передать пул соединений postgres работникам Dask с помощью psycopg2 или asyncpg? - PullRequest
1 голос
/ 20 марта 2020

Я хочу, чтобы мои работники Dask получали Postgres соединение с ThreadedConnectionPool, но при прохождении пула вот так

from psycopg2.pool import ThreadedConnectionPool

def worker_pg(n, pool) -> None:
    print(n)

work = db.from_sequence(range(4))
tcp = ThreadedConnectionPool(1, 800, "db_string")

work.map(worker_pg, pool=tcp).compute()

Я получаю ошибки сериализации, такие как:

TypeError: ('Could not serialize object of type ThreadedConnectionPool.', '<psycopg2.pool.ThreadedConnectionPool object at 0x7f99dc57b128>')

Кроме того, хотя я пробовал это с psycopg2, мне бы очень хотелось, чтобы это работало с asyncpg (из соображений производительности). Тем не менее, это добавляет морщины использования await и async из asyncio

import asyncio
import asyncpg

async def get_pool():
    p = await asyncpg.create_pool("db_string")
    return p

pool = asyncio.get_event_loop().run_until_complete(get_pool())

work.map(worker_pg, pool=pool).compute()

, хотя я, кажется, в конечном итоге с такими же типами ошибок, как

TypeError: ('Could not serialize object of type Pool.', '<asyncpg.pool.Pool object at 0x7fdee9127818>')

Любые предложения (или альтернативы?) Очень ценятся!

1 Ответ

0 голосов
/ 28 марта 2020

Как предложено в комментариях, вы можете рассмотреть вопрос о том, чтобы каждая из ваших задач открывала соединение с Postgres, выполняла запрос и затем закрывала это соединение.

К сожалению, Dask не может переместить активную базу данных связь между машинами. Эти объекты тесно связаны с процессом, в котором они запущены.

...