Конкатенация фрейма данных dask и фрейма данных pandas - PullRequest
0 голосов
/ 15 февраля 2019

У меня есть фрейм данных dask (df) с примерно 250 миллионами строк (из файла CSV 10 Гб).У меня есть другой кадр данных панд (ndf) из 25 000 строк.Я хотел бы добавить первый столбец pandas dataframe в dask dataframe, повторяя каждый элемент 10000 раз каждый.

Вот код, который я пробовал.Я уменьшил проблему до меньшего размера.

import dask.dataframe as dd
import pandas as pd
import numpy as np

pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv")
df = dd.read_csv("tempfile.csv")
ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))
df['Node'] = np.repeat(ndf[0], 10)

С этим кодом я получаю сообщение об ошибке.

ValueError: Не все деления известны, не можетвыравнивать разделы.Пожалуйста, используйте set_index для установки индекса.

Я могу выполнить reset_index(), а затем set_index(), чтобы сделать df.known_divisions True для кадра данных dask.Но это трудоемкая операция.Есть ли лучший быстрый способ сделать то, что я пытаюсь сделать?Могу ли я сделать это с помощью самой панды?

Конечная цель - найти строки из ndf, где любая из соответствующих строк из df соответствует некоторым критериям.

Ответы [ 2 ]

0 голосов
/ 26 февраля 2019

Ваш основной алгоритм: «Я бы хотел, чтобы первые 10 значений df['Node'] были установлены на первое значение ndf, следующие 10 значений - на следующее значение ndf и т. Д.».Причина, по которой это трудно сделать в Dask, заключается в том, что он не знает, сколько строк в каждом разделе: вы читаете из CSV, а количество строк, которые вы получаете в байтах X, зависит от того, какими именно являются данные в каждой части.,Другие форматы дают вам больше информации ...

Поэтому вам, безусловно, понадобятся два прохода через данные.Вы можете поработать с индексом, выяснить деления и, возможно, провести некоторую сортировку.На мой взгляд, самое простое, что вы можете сделать, это просто измерить длину деления и получить смещение начала каждого:

lengths = df.map_partitions(len).compute()
offsets = np.cumsum(lengths.values)
offsets -= offsets[0]

и теперь использовать пользовательскую функцию задержки для работы с деталями

@dask.delayed
def add_node(part, offset, ndf):
    index = pd.Series(range(offset, offset + len(part)) // 10,
                      index=part.index)  # 10 is the repeat factor
    part['Node'] = index.map(ndf)
    return part

df2 = dd.from_delayed([add_node(d, off, ndf) 
                       for d, off in zip(df.to_delayed(), offsets)])
0 голосов
/ 19 февраля 2019

Используя тот же рабочий процесс, вы можете вручную установить divisions, как предлагается здесь

import dask.dataframe as dd
import pandas as pd
import numpy as np

pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv", index=False)
df = dd.read_csv("tempfile.csv")
ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))


df.divisions = (0, len(df)-1)
df["Note"] = dd.from_array(np.repeat(ndf.values, 10))

Я не думаю, что использование np.repeat очень эффективно, особенно для больших значений df.

...