Просто задержите это!
Dask.dataframe
и dask.delayed
- вот что вам нужно, и запуск с использованием dask.distributed
должен работать нормально. Предполагая, что df
по-прежнему pandas.DataFrame
, превратите цикл в функцию, которую вы можете вызвать в понимании списка, используя dask.delayed
. Я внес небольшие изменения в ваш код ниже:
import random
import pandas as pd
import dask.dataframe as dd
from dask import delayed
df = pd.DataFrame({
'id1': range(1,6),
'a': [random.random() for i in range(5)],
'b': [random.random() for i in range(5)],
})
df = df.set_index('id1')
factors = [random.random() for i in range(10)]
dflist = []
def scale_my_df(df_init, scale_factor, id_num):
'''
Scales and returns a DataFrame.
'''
df_scaled = df_init * scale_factor
df_scaled['id2'] = id_num
return df_scaled
dfs_delayed = [delayed(scale_my_df)(df_init=df, scale_factor=factor, id_num=i)
for i, factor in enumerate(factors)]
ddf = dd.from_delayed(dfs_delayed)
И теперь у вас есть dask.DataFrame
, построенный из ваших масштабированных pandas.DataFrame
с. Следует отметить две вещи:
Dask
является ленивым, так что на конец этого фрагмента кода ничего не было вычислено. Был создан вычислительный граф с необходимыми операциями для создания необходимого DataFrame. В этом примере с небольшими DataFrames вы можете выполнить:
ddf_large = ddf.compute()
И у вас будет тот же pandas.DataFrame
, что и dflarge
в приведенном выше коде, при условии, что factors
совпадают. Почти ...
- На момент написания статьи
dask
не поддерживает многоуровневые индексы, поэтому ваш код .set_index(['id1', 'id2'])
не будет работать. Это было поднято в выпуске # 1493 , и есть некоторые обходные пути, если вам действительно нужен многоуровневый индекс.
РЕДАКТИРОВАТЬ:
- Если исходные данные
df
действительно велики, как, например, при увеличении вашей памяти, необходимо преобразовать их в .csv
или другой pandas
-читаемый формат и встроить их в функцию масштабирования, то есть:
def scale_my_df(df_filepath, scale_factor, id_num):
'''
Scales and returns a DataFrame.
'''
df_init = pd.read_csv(df_filepath)
df_scaled = df_init * scale_factor
df_scaled['id2'] = id_num
return df_scaled
И откорректируйте остальную часть кода соответственно. Идея dask
состоит в том, чтобы хранить данные в памяти, но есть некоторые накладные расходы, связанные с построением вычислительного графа и хранением промежуточных значений.