Масштабирование и объединение данных Pandas в Dask DataFrame - PullRequest
0 голосов
/ 10 мая 2019

У меня довольно большой кадр данных для панд df. У меня также есть ряд масштабных коэффициентов панд factors.

Я хочу масштабировать df для каждого масштабного коэффициента в factors и объединять эти кадры данных вместе в больший кадр данных. Поскольку этот большой массив данных не помещается в памяти, я подумал, что для него может быть полезно использовать dask dataframe. Но я не знаю, как обойти эту проблему.

Ниже приведено то, чего я хочу достичь, но с использованием панелей данных панд. dflarge в данном случае не помещается в память.

import random
import pandas as pd

df = pd.DataFrame({
        'id1': range(1,6), 
        'a': [random.random() for i in range(5)], 
        'b': [random.random() for i in range(5)],
    })
df = df.set_index('id1')

factors = [random.random() for i in range(10)]

dflist = []

for i, factor in enumerate(factors):
    scaled = df*factor
    scaled['id2'] = i
    dflist.append(scaled)

dflarge = pd.concat(dflist)
dflarge = dflarge.reset_index().set_index(['id1', 'id2'])

Я хотел бы сделать масштабирование и конкатенацию максимально эффективными, поскольку будут иметься десятки тысяч масштабных коэффициентов. Я хотел бы запустить его, если возможно.

Я действительно ценю любую помощь, которую вы можете оказать.

1 Ответ

2 голосов
/ 10 мая 2019

Просто задержите это!

Dask.dataframe и dask.delayed - вот что вам нужно, и запуск с использованием dask.distributed должен работать нормально. Предполагая, что df по-прежнему pandas.DataFrame, превратите цикл в функцию, которую вы можете вызвать в понимании списка, используя dask.delayed. Я внес небольшие изменения в ваш код ниже:

import random
import pandas as pd
import dask.dataframe as dd
from dask import delayed

df = pd.DataFrame({
        'id1': range(1,6), 
        'a': [random.random() for i in range(5)], 
        'b': [random.random() for i in range(5)],
    })
df = df.set_index('id1')

factors = [random.random() for i in range(10)]

dflist = []

def scale_my_df(df_init, scale_factor, id_num):
    '''
    Scales and returns a DataFrame.
    '''
    df_scaled = df_init * scale_factor
    df_scaled['id2'] = id_num
    return df_scaled

dfs_delayed = [delayed(scale_my_df)(df_init=df, scale_factor=factor, id_num=i) 
               for i, factor in enumerate(factors)]
ddf = dd.from_delayed(dfs_delayed)

И теперь у вас есть dask.DataFrame, построенный из ваших масштабированных pandas.DataFrame с. Следует отметить две вещи:

  1. Dask является ленивым, так что на конец этого фрагмента кода ничего не было вычислено. Был создан вычислительный граф с необходимыми операциями для создания необходимого DataFrame. В этом примере с небольшими DataFrames вы можете выполнить:

    ddf_large = ddf.compute()

И у вас будет тот же pandas.DataFrame, что и dflarge в приведенном выше коде, при условии, что factors совпадают. Почти ...

  1. На момент написания статьи dask не поддерживает многоуровневые индексы, поэтому ваш код .set_index(['id1', 'id2']) не будет работать. Это было поднято в выпуске # 1493 , и есть некоторые обходные пути, если вам действительно нужен многоуровневый индекс.

РЕДАКТИРОВАТЬ:

  1. Если исходные данные df действительно велики, как, например, при увеличении вашей памяти, необходимо преобразовать их в .csv или другой pandas -читаемый формат и встроить их в функцию масштабирования, то есть:
    def scale_my_df(df_filepath, scale_factor, id_num):
        '''
        Scales and returns a DataFrame.
        '''
        df_init = pd.read_csv(df_filepath)
        df_scaled = df_init * scale_factor
        df_scaled['id2'] = id_num
        return df_scaled

И откорректируйте остальную часть кода соответственно. Идея dask состоит в том, чтобы хранить данные в памяти, но есть некоторые накладные расходы, связанные с построением вычислительного графа и хранением промежуточных значений.

...