Как запустить SQLAlchemy Query на Dask Distributed? - PullRequest
0 голосов
/ 26 апреля 2020

Я пытаюсь запустить и распараллелить этот запрос sqlalchemy, используя кластер dask, который я настроил, так как у меня недостаточно памяти для его выполнения с локального компьютера.

Мой код выглядит следующим образом - я не уверен, что это лучший способ выполнить sh это:

from dask.distributed import Client
import dask.dataframe as dd
from dask.delayed import delayed
client = Client(<IP Address>)

recent_dates = ['2020-04-24', '2020-04-23', 2020-04-22']

query = """SELECT * FROM table WHERE date = '%s'"""
queries = [query.format(d) for d in recent_dates]

from sqlalchemy.engine import create_engine
conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                           connect_args={'protocol': 'https',
                                         'requests_kwargs': {'verify': key}})

con = engine.connect()
df = dd.from_delayed([delayed(pd.read_sql_query)(q, conn) for q in queries])

Я получаю следующую ошибку:

TypeError: can't pickle _thread.RLock objects

1 Ответ

1 голос
/ 27 апреля 2020

Вам следует использовать функцию read_sql_table, которая создана именно для этой цели. Если вы прочтете строки документации и / или код, вы увидите, что это сам запрос, который передается рабочим, которые создают свои собственные экземпляры движка локально. Это связано с тем, что экземпляр sqlalchemy имеет состояние, которое не может быть отправлено между работниками, как вы обнаружили.

Обратите внимание, что read_sql_table также заботится о разделении ваших данных, потому что это Dask, и весь смысл в том, чтобы иметь дело с данные больше чем память. В вашем примере, я предполагаю, что столбец индекса / разбиения равен date, и вы хотите передать «деления», на которые нужно разделить явно.

...