Мне трудно использовать разделы Dask при работе с очень большими фреймами данных. Представьте себе 200 ГБ CSV, который содержит журнал поездок на такси. Я загружаю данные следующим образом:
df = dd.read_csv("/data/taxi_data_big.tsv", sep="\t")
Затем для каждого водителя я хочу найти самую раннюю поездку в аэропорт (DestinationId == 7).
df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]]
Кадр данных df1 будет выглядеть примерно так:
2020-01-01 D1 T1 8 7
2020-01-01 D1 T2 11 7
2020-01-01 D1 T3 44 7
2020-01-02 D1 T4 8 7
2020-01-02 D1 T5 13 7
2020-01-01 D2 T77 20 7
2020-01-01 D2 T177 76 7
1 января 2020 года 20-я и 76-я поездки Driver D2 были в аэропорт.
Для моего анализа мне нужно найти среднее количество поездок, которые водитель совершает перед поездкой в аэропорт.
df2 = df1.groupby('TripId').TripId_Rank.idxmin()
даст мне TripId и индекс первой поездки на airport.
df4 = df2.loc[df3]
, чтобы выбрать соответствующие строки. Это работает для небольшого набора данных, но когда я перехожу к большому набору данных, я получаю "ValueError: Not all divisions are known, can't align partitions" when performing math on dataframe column.
Если я правильно понимаю, ошибка вызвана загрузкой кадра данных в несколько разделов и Dask. документация требует явного индекса, установленного для кадра данных.
df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]].compute()
df1['id'] = np.arange(len(df2)) # explicitly add index column to the dataframe
df1 = df1.set_index("id") # is this really necessary? This takes hours to complete
df2 = df1.groupby('TripId').TripId_Rank.idxmin()
df4 = df2.loc[df3]
df
Приведенный выше код работает, но мне интересно, есть ли лучшее решение этой проблемы. Добавление столбца id к кадру данных очень медленное , и я не уверен, что приведенный выше код использует параллелизацию Dask.
Заранее спасибо.