Нахождение максимального минимального значения внутри группы в многораздельном фрейме данных Dask - PullRequest
0 голосов
/ 22 апреля 2020

Мне трудно использовать разделы Dask при работе с очень большими фреймами данных. Представьте себе 200 ГБ CSV, который содержит журнал поездок на такси. Я загружаю данные следующим образом:

df = dd.read_csv("/data/taxi_data_big.tsv", sep="\t")

Затем для каждого водителя я хочу найти самую раннюю поездку в аэропорт (DestinationId == 7).

df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]]

Кадр данных df1 будет выглядеть примерно так:

2020-01-01    D1    T1    8    7
2020-01-01    D1    T2    11   7
2020-01-01    D1    T3    44   7
2020-01-02    D1    T4    8    7
2020-01-02    D1    T5    13   7
2020-01-01    D2    T77   20   7
2020-01-01    D2    T177  76   7

1 января 2020 года 20-я и 76-я поездки Driver D2 были в аэропорт.

Для моего анализа мне нужно найти среднее количество поездок, которые водитель совершает перед поездкой в ​​аэропорт.

df2 = df1.groupby('TripId').TripId_Rank.idxmin() даст мне TripId и индекс первой поездки на airport.

df4 = df2.loc[df3], чтобы выбрать соответствующие строки. Это работает для небольшого набора данных, но когда я перехожу к большому набору данных, я получаю "ValueError: Not all divisions are known, can't align partitions" when performing math on dataframe column.

Если я правильно понимаю, ошибка вызвана загрузкой кадра данных в несколько разделов и Dask. документация требует явного индекса, установленного для кадра данных.

df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]].compute()

df1['id'] = np.arange(len(df2)) # explicitly add index column to the dataframe
df1 = df1.set_index("id") # is this really necessary? This takes hours to complete

df2 = df1.groupby('TripId').TripId_Rank.idxmin()
df4 = df2.loc[df3]
df

Приведенный выше код работает, но мне интересно, есть ли лучшее решение этой проблемы. Добавление столбца id к кадру данных очень медленное , и я не уверен, что приведенный выше код использует параллелизацию Dask.

Заранее спасибо.

1 Ответ

0 голосов
/ 23 апреля 2020

Один из подходов, который может сработать, - это применить применить к сгруппированным данным.

df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]]

df2['idx'] = df2.index

def foo(grouped_df):
    row_with_min_cnt_index = grouped_df['Impression_Rank'].idxmin()
    row_with_min_cnt = grouped_df.loc[row_with_min_cnt_index]
    return row_with_min_cnt['idx']

keep_ids = df2.groupby('DriverId').apply(foo, meta=('x', 'f8')).compute()
df2[df2['idx'].isin(keep_ids)].compute()

Однако имейте в виду, что добавление столбца idx в существующий фрейм данных займет много времени.

...