Как применить функцию к нескольким столбцам Dask Data Frame параллельно? - PullRequest
0 голосов
/ 31 августа 2018

У меня есть Dask Dataframe, для которого я хотел бы вычислить асимметрию для списка столбцов, и, если эта асимметрия превышает определенный порог, я исправляю его, используя преобразование журналов. Мне интересно, есть ли более эффективный способ заставить функцию correct_skewness() работать на нескольких столбцах параллельно, удалив цикл for в функции correct_skewness() ниже:

import dask
import dask.array as da 
from scipy import stats

# Create a dataframe 
df = dask.datasets.timeseries()

df.head()

                      id     name         x         y
timestamp
2000-01-01 00:00:00  1032   Oliver  0.018604  0.089191
2000-01-01 00:00:01  1032  Norbert  0.666689 -0.979374
2000-01-01 00:00:02   991   Victor  0.027691 -0.474660
2000-01-01 00:00:03   979    Kevin  0.320067  0.656949
2000-01-01 00:00:04  1087    Zelda -0.462076  0.513409


def correct_skewness(columns=None, max_skewness=2):
    if columns is None:
        raise ValueError(
            f"columns argument is None. Please set columns argument to a list of columns"
        )


    for col in columns:
        skewness = stats.skew(df[col])
        max_val = df[col].max().compute()
        min_val = df[col].min().compute()

        if abs(skewness) > max_skewness and (max_val > 1 or min_val < 0):
            delta = 1.0
            if min_val < 0:
                delta = max(1, -min_val + 1)
            df[col] = da.log(delta + df[col])
    return df

df = correct_skewness(columns=['x', 'y']) 

1 Ответ

0 голосов
/ 31 августа 2018

Есть пара вещей, которые вы можете сделать для улучшения параллелизма в этом примере:

Вы можете использовать dask.array.stats.skew вместо statsmodels.skew. Вам придется import dask.array.stats явно

Вы можете вычислить мин / макс всех столбцов в одном вычислении

    mins = [df[col].min() for col in cols]
    maxes = [df[col].min() for col in cols]
    skews = [da.stats.skew(df[col]) for col in cols]

    mins, maxes, skews = dask.compute(mins, maxes, skews)

Тогда вы можете выполнить свою if-логику и применить da.log в зависимости от ситуации. Это все еще требует двух проходов над вашими данными, но это должно быть хорошим улучшением по сравнению с тем, что у вас есть сейчас.

...