Создать таблицу SQL из кадра данных dask, используя map_partitions и pd.df.to_sql - PullRequest
0 голосов
/ 24 января 2019

У Dask нет df.to_sql (), подобного pandas, и поэтому я пытаюсь воспроизвести эту функциональность и создать таблицу sql, используя для этого метод map_partitions. Вот мой код:

import dask.dataframe as dd
import pandas as pd
import sqlalchemy_utils as sqla_utils

db_url = 'my_db_url_connection'
conn = sqla.create_engine(db_url)

ddf = dd.read_csv('data/prod.csv')
meta=dict(ddf.dtypes)
ddf.map_partitions(lambda df: df.to_sql('table_name', db_url, if_exists='append',index=True), ddf, meta=meta)

Это возвращает мой объект dask dataframe, но когда я смотрю на свой psql-сервер, новой таблицы нет ... что здесь не так?

UPDATE Все еще не могу заставить это работать, но из-за независимой проблемы. Дополнительный вопрос: значение дублированного ключа нарушает уникальное ограничение - ошибка postgres при попытке создать таблицу sql из кадра данных dask

1 Ответ

0 голосов
/ 24 января 2019

Просто вы создали фрейм данных, который является предписанием работы, которую нужно выполнить, но вы не выполнили ее. Для выполнения необходимо вызвать .compute() на результат.

Обратите внимание, что вывод здесь на самом деле не является фреймом данных, каждый раздел оценивается как None (потому что to_sql не имеет вывода), так что было бы лучше выразить это с df.to_delayed, что-то вроде

dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'table_name', db_url, if_exists='append', index=True)
       for d in ddf.to_delayed()]
dask.compute(*out)

Также обратите внимание, что хороший параллелизм будет зависеть от драйвера базы данных и самой системы данных.

...