Я обнаружил, что в определенных ситуациях проще всего справиться с этим, рассредоточив DataFrame
объекты в кластере с помощью pd.read_sql
и его chunksize
аргумента:
from dask import bag as db
sql_text = "SELECT ..."
sql_meta = {"column0": "object", "column1": "uint8"}
sql_conn = connect(...)
dfs_futs = map(client.scatter, # Scatter each object to the cluster
pd.read_sql(sql_text,
sql_conn,
chunksize=10_000, # Iterate in chunks of 10,000
columns=list(sql_meta.keys())))
# Now join our chunks (remotely) into a single frame.
df = db.from_sequence(list(dfs_futs)).to_dataframe(meta=sql_meta)
Это приятно поскольку вам не нужно обрабатывать какие-либо потенциальные драйверы / пакеты, которые было бы неудобно управлять на распределенных узлах и / или в ситуациях, когда трудно легко разделить ваши данные.
Просто примечание о производительности для моего использования В этом случае мы используем операции внешней таблицы нашей базы данных для буферизации данных в CSV, а затем читаем это с помощью pd.read_csv
(, это почти то же самое, что и выше ), а SELECT ... FROM ... WHERE
по сравнению с тем, как Dask распараллеливает и разбивает запросы на части, может быть приемлемым с точки зрения производительности, поскольку выполнение разбиения на фрагменты внутри базы данных требует затрат.