преобразовать dask.bag словарей в dask.dataframe, используя dask.delayed и pandas.DataFrame - PullRequest
2 голосов
/ 22 марта 2019

Я изо всех сил пытаюсь преобразовать dask.bag словарей в dask.delayed pandas.DataFrames в окончательный dask.dataframe

У меня есть одна функция (make_dict), которая считывает файлы в довольно сложную вложенную словарную структуру, и другая функция (make_df), чтобы превратить эти словари в pandas.DataFrame (результирующий кадр данных составляет около 100 МБ для каждого файла). Я хотел бы добавить все кадры данных в один dask.dataframe для дальнейшего анализа.

До сих пор я использовал dask.delayed объекты для загрузки, преобразования и добавления всех данных, которые работают нормально (см. Пример ниже). Однако для дальнейшей работы я хотел бы сохранить загруженные словари в dask.bag, используя dask.persist().

Мне удалось загрузить данные в dask.bag, в результате чего появился список диктов или список pandas.DataFrame, который я могу использовать локально после вызова compute(). Когда я попытался превратить dask.bag в dask.dataframe, используя to_delayed(), я застрял с ошибкой (см. Ниже).

Такое ощущение, что я здесь упускаю что-то довольно простое или, возможно, мой подход к dask.bag неверен?

В приведенном ниже примере показан мой подход с использованием упрощенных функций и выдает ту же ошибку. Любой совет о том, как справиться с этим, приветствуется.

import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag

print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2

def make_dict(n=1):
    return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}

def make_df(d):
    return pd.DataFrame(d['data'])

k = [1,2,3]

# using dask.delayed
dfs = []
for n in k:
    delayed_1 = dask.delayed(make_dict)(n)
    delayed_2 = dask.delayed(make_df)(delayed_1)
    dfs.append(delayed_2)
ddf1 = dask.dataframe.from_delayed(dfs).compute() # this works as expected

# using dask.bag and turning bag of dicts into bag of DataFrames
b1 = dask.bag.from_sequence(k).map(make_dict)
b2 = b1.map(make_df)

df = pd.DataFrame().append(b2.compute()) # <- I would like to do this using delayed dask.DataFrames like above
ddf2 = dask.dataframe.from_delayed(b2.to_delayed()).compute() # <- this fails

# error:
# ValueError: Expected iterable of tuples of (name, dtype), got [   A  B
# 0  0  0]

что я в конечном итоге хотел бы сделать с помощью распределенного планировщика:

b = dask.bag.from_sequence(k).map(make_dict)
b = b.persist()
ddf = dask.dataframe.from_delayed(b.map(make_df).to_delayed())

1 Ответ

0 голосов
/ 27 марта 2019

В случае мешков отложенные объекты указывают на списки элементов, поэтому у вас есть список списков данных pandas, что не совсем то, что вам нужно. Две рекомендации

  1. Просто придерживайтесь dask.delayed. Кажется, у вас хорошо работает
  2. Используйте метод Bag.to_dataframe , который ожидает пакет с данными и выполняет само преобразование кадра данных
...