Использование Dask с задержкой на маленьких / секционированных данных - PullRequest
0 голосов
/ 24 сентября 2019

Я работаю с данными временного ряда, которые отформатированы, поскольку каждая строка представляет собой отдельный экземпляр идентификатора / времени / данных.Это означает, что строки не соответствуют 1 к 1 для каждого идентификатора.Каждый идентификатор имеет много строк во времени.

Я пытаюсь использовать задержку dask для запуска функции на всей последовательности идентификаторов (имеет смысл, что операция должна быть в состоянии запускаться для каждого отдельного идентификатора одновременновремя, так как они не влияют друг на друга).Для этого я сначала перебираю каждый из тегов идентификатора, извлекаю / определяю местонахождение всех данных этого идентификатора (с .loc в пандах, так что это отдельный «мини» df), затем задерживаю вызов функции на мини dfдобавление столбца с задержанными значениями и добавление его в список всех мини-файлов.В конце цикла for я хочу вызвать dask.compute () сразу для всех mini-df, но по какой-то причине значения mini df все еще задерживаются.Ниже я опубликую некоторый псевдокод о том, что я только что пытался объяснить.

У меня такое ощущение, что это может быть не самый лучший способ сделать это, но это то, что имело смысл в то время, и я не могу понятьчто не так, поэтому любая помощь будет очень цениться.

Вот что я пытаюсь сделать:

list_of_mini_dfs = []
for id in big_df:

    curr_df = big_df.loc[big_df['id'] == id]
    curr_df['new value 1'] = dask.delayed(myfunc)(args1)
    curr_df['new value 2'] = dask.delayed(myfunc)(args2) #same func as previous line

    list_of_mini_dfs.append(curr_df)

list_of_mini_dfs = dask.delayed(list_of_mini_dfs).compute()

Concat all mini dfs into new big df.

Как вы можете видеть из кода, я должен добраться до моего большого /общий фрейм данных для извлечения последовательности данных каждого идентификатора, поскольку она разбросана по строкам.Я хочу иметь возможность вызывать отложенную функцию для данных этого единственного идентификатора, а затем возвращать значения из вызова функции в большой / общий фрейм данных.

В настоящее время этот метод не работает, когда я объединяю все мини-фреймы данных вместе, два значения, которые я задержал, все еще задерживаются, что заставляет меня думать, что это связано с тем, как я задерживаю функцию внутриDF и пытается вычислить список данных кадров.Я просто не вижу, как это исправить.

Надеюсь, это было относительно ясно, и спасибо за помощь.

1 Ответ

0 голосов
/ 24 сентября 2019

IIUC вы пытаетесь сделать что-то вроде transform, используя dask.

import pandas as pd
import dask.dataframe as dd
import numpy as np

# generate big_df
dates = pd.date_range(start='2019-01-01',
                      end='2020-01-01')
l = len(dates)
out = []
for i in range(1000):
    df = pd.DataFrame({"ID":[i]*l,
                       "date": dates,
                       "data0": np.random.randn(l),
                       "data1": np.random.randn(l)})

    out.append(df)

big_df = pd.concat(out, ignore_index=True)\
           .sample(frac=1)\
           .reset_index(drop=True)

Теперь вы хотите применить свою функцию fun к столбцам data0 и data1

Панды

out = big_df.groupby("ID")[["data0","data1"]]\
            .apply(fun)\
            .reset_index()

df_pd = pd.merge(big_df, out, how="left", on="ID" )

Dask

df = dd.from_pandas(big_df, npartitions=4)

out = df.groupby("ID")[["data0","data1"]]\
        .apply(fun, meta={'data0':'f8',
                          'data1':'f8'})\
        .rename(columns={'data0': 'new_values0',
                         'data1': 'new_values1'})\
        .compute() # Here you need to compute otherwise you'll get NaNs

df_dask = dd.merge(df, out,
                   how="left", 
                   left_on=["ID"],
                   right_index=True)

Версия Dask не обязательно быстрее, чем версия Pandas.В частности, если ваш df помещается в ОЗУ.

...