Как исправить 'ValueError: все ключи должны быть одинаковой формы' с помощью Dask groupby - PullRequest
0 голосов
/ 01 мая 2019

Я пытаюсь использовать groupby() на dask dataframe, но получаю ValueError , упомянутую в заголовке, при увеличении количества разделов. groupby() применяется к неиндексным столбцам .

Фактический набор данных - это CSV с ~ 14 000 000 строк. Я экспериментировал с меньшей выборкой из 140 000 строк, и все отлично работает для npartition = {2,3 или 4} , однако установка npartition = 5 приводит к ValueError.
Чтение всего CSV (14 миллионов строк) с помощью "dd.read_csv ()" и установка npartition для любого произвольного значения (npartition = 40 выбирается dask автоматически) также приводит к ValueError.

Я также пытался использовать set_index () для неиндексного столбца "vendor_id", который возвращает пустой фрейм данных - чего я тоже не понимаю

Я использую Ubuntu 18.04 Fresh Conda. Dask 1.2.0 Панды 0.24.2 numpy 1.16.3

Пример кода:

import numpy as np
import pandas as pd
import dask.dataframe as dd


#small sample df:
dict_sample= {'vendor_id': {0: 'CMT',  1: 'CMT',  2: 'CMT',  3: 'CMT',  4: 'VTS',  5: 'VTS',  6: 'VTS',  7: 'VTS',  8: 'CMT',  9: 'VTS',  10: 'VTS',  11: 'VTS',  12: 'VTS',  13: 'VTS'},
 'pickup_datetime': {0: '2009-01-22 11:21:35',  1: '2009-01-22 21:17:22',  2: '2009-01-09 22:25:13',  3: '2009-01-23 17:20:01',  4: '2009-01-24 23:18:00',  5: '2009-01-26 22:03:00',  6: '2009-01-02 15:58:00',  7: '2009-01-16 19:38:00',  8: '2009-01-13 18:57:06',  9: '2009-01-09 14:51:00',  10: '2009-01-14 18:15:00',  11: '2009-01-02 23:17:00',  12: '2009-01-31 09:59:00',  13: '2009-01-19 14:57:00'},
 'passenger_count': {0: 1,  1: 1,  2: 2,  3: 1,  4: 1,  5: 1,  6: 5,  7: 5,  8: 1,  9: 1,  10: 1,  11: 1,  12: 1,  13: 1}}


df_ = pd.DataFrame(dict_sample)


#dask:

ddf_raw = dd.from_pandas(df_, npartitions=3)

ddf_raw['pickup_datetime'] = dd.to_datetime(ddf_raw['pickup_datetime'])


#using groupby:
date_grouper = pd.Grouper(key='pickup_datetime',freq='1H',label="left")
ddf_raw_grouped = ddf_raw.groupby(['vendor_id',date_grouper]).passenger_count.count()

ddf_raw_grouped.head()

с использованием 140 000 строк и npartitions = 5 результатов:

...sorter = np.lexsort((labels, self.indexer))

ValueError: all keys need to be the same shape

пытается set_index ()

ddf_raw = ddf_raw.set_index('vendor_id')
ddf_raw.head()

возвращает

Empty DataFrame
Columns: [pickup_datetime, passenger_count]
Index: []

Я довольно новичок в dask, и я не понимаю, что вызывает ValueError . Использование groupby () в pandas с выборкой из 140 000 строк работает без проблем.

Кроме того, я заметил, что игра с npartitions также меняет конечный результат "passenger_count.count ()"

Наконец, я попытался повторить ошибку с другим набором данных

import dask
df_dask = dask.datasets.timeseries()

df_dask = df_dask.repartition(npartitions= 20)
f = pd.Grouper(key='timestamp',freq='1H',label="left")
g = pd.Grouper(key='name')
df_dask = df_dask.groupby([g, f]).x.count()
df_dask.head()

В этом случае groupby () работает без каких-либо проблем, но окончательный результат "x.count ()" изменяется с различными значениями npartitions .

Я не вижу, что не так с моим CSV

## ОБНОВЛЕНИЕ:

Мне удалось использовать groupby () со следующим обходным решением, но в другом столбце ("pickup_id"):


ddf_raw = ddf_raw.set_index("pickup_id")

ddf_raw = ddf_raw.persist()

def groupby_date_pickup(df,grouper_1,grouper_2):

    df_local = df[[grouper_1,grouper_2]].copy()

    date_grouper = pd.Grouper(key=grouper_1,freq='1H',label="left")
    df_local_grouped = df_local.groupby([df_local.index, date_grouper])[grouper_2].count()

    print("Grouping Step")
    return df_local_grouped


grouped_ddf = ddf_raw.map_partitions(groupby_date_pickup, "pickup_datetime", "passenger_count")


...