Как сбросить индекс на объединенном фрейме данных в Dask - PullRequest
1 голос
/ 23 апреля 2020

Я новичок в Даске и подумал, что это будет простая задача. Я хочу загрузить данные из нескольких файлов CSV и объединить их в один кадр данных Dask. в этом примере есть 5 CSV-файлов с 10 000 строк данных в каждом. Очевидно, я хочу дать объединенному фрейму данных уникальный индекс.

Итак, я сделал это:

import dask.dataframe as dd

# Define Dask computations
dataframes = [
    dd.read_csv(os.path.join(data_dir, filename)).set_index('Unnamed: 0')
    for filename in os.listdir(data_dir) if filename.endswith('.csv')
]

combined_df = dd.concat(dataframes).reset_index(drop=True)

Если я тогда сделаю combined_df.head().index, я получу это как ожидалось:

RangeIndex(start=0, stop=5, step=1)

Но combined_df.tail().index не соответствует ожидаемому:

RangeIndex(start=3252, stop=3257, step=1)

Дальнейшая проверка выявляет значения индекса на combined_df, состоящие из 15 отдельных серий длиной примерно 3256, что в сумме составляет 50000 Обратите внимание, что все файлы csv содержат индекс в первом столбце от 0 до 10000.

Что здесь происходит и как мне получить стандартный целочисленный индекс от 0 до 50000, который представляет собой суммарное общее число строки во всех файлах csv?

Фон

Если вам нужно проверить приведенный выше код, вот скрипт установки для создания некоторых файлов csv:

import os
import numpy as np
import pandas as pd

# Create 5 large csv files (could be too big to fit all in memory)
shape = (10000, 1000)

data_dir = 'data'
if not os.path.exists(data_dir):
    os.mkdir(data_dir)

for i in range(5):
    filepath = os.path.join(data_dir, f'datafile_{i:02d}.csv')
    if not os.path.exists(filepath):
        data = (i + 1) * np.random.randn(shape[0], shape[1])
        print(f"Array {i} size in memory: {data.nbytes*1e-6:.2f} MB")
        pd.DataFrame(data).to_csv(filepath)

ОБНОВЛЕНИЕ:

Похоже, возникает та же проблема с этим методом:

combined_df = dd.read_csv(os.path.join(data_dir, '*.csv'))
print(dd.compute(combined_df.tail().index)[0])
print(dd.compute(combined_df.reset_index(drop=True).tail().index)[0])

RangeIndex(start=3252, stop=3257, step=1)
RangeIndex(start=3252, stop=3257, step=1)

Мне кажется, reset_index метод производит тот же индекс.

1 Ответ

2 голосов
/ 24 апреля 2020

В версии dask , reset_index выполняет свою задачу отдельно (и одновременно) на каждом разделе, поэтому последовательные числа в индексе "перезапускаются" как некоторые точки на самом деле в начале каждого раздела.

Чтобы обойти это ограничение, вы можете:

  • Назначить новый столбец, заполненный 1 .
  • Установите индекс на cumsum () - 1 , вычисленный для этого столбца (к счастью, в отличие от reset_index , cumsum вычисляется на целом DataFrame).

Побочным эффектом является то, что имя индекса теперь является именем этого нового столбца. Если вы хотите очистить его, вы должны сделать это на уровне раздела, вызывая map_partitions .

Таким образом, весь код может быть:

ddf = ddf.assign(idx=1)
ddf = ddf.set_index(ddf.idx.cumsum() - 1)
ddf = ddf.map_partitions(lambda df: df.rename(index = {'idx': None}))

Обратите внимание, что назначить (idx = 1) в порядке, поскольку это единственное значение транслируется на длину всего кадра данных, поэтому каждый элемент в этом новом столбце будет установлено значение 1 , без моего знания того, сколько строк содержит DataFrame. Это одна из замечательных особенностей базового пакета Numpy, который существенно упрощает программирование как в Numpy, Pandas, так и в dask .

Затем вы можете запустить: ddf.compute(), чтобы увидеть результат.

...