Я хочу, чтобы мои работники Dask получали Postgres соединение с ThreadedConnectionPool
, но при прохождении пула вот так
from psycopg2.pool import ThreadedConnectionPool
def worker_pg(n, pool) -> None:
print(n)
work = db.from_sequence(range(4))
tcp = ThreadedConnectionPool(1, 800, "db_string")
work.map(worker_pg, pool=tcp).compute()
Я получаю ошибки сериализации, такие как:
TypeError: ('Could not serialize object of type ThreadedConnectionPool.', '<psycopg2.pool.ThreadedConnectionPool object at 0x7f99dc57b128>')
Кроме того, хотя я пробовал это с psycopg2
, мне бы очень хотелось, чтобы это работало с asyncpg
(из соображений производительности). Тем не менее, это добавляет морщины использования await
и async
из asyncio
import asyncio
import asyncpg
async def get_pool():
p = await asyncpg.create_pool("db_string")
return p
pool = asyncio.get_event_loop().run_until_complete(get_pool())
work.map(worker_pg, pool=pool).compute()
, хотя я, кажется, в конечном итоге с такими же типами ошибок, как
TypeError: ('Could not serialize object of type Pool.', '<asyncpg.pool.Pool object at 0x7fdee9127818>')
Любые предложения (или альтернативы?) Очень ценятся!