AttributeError: у объекта 'Series' нет атрибута 'columns' в Dask - PullRequest
1 голос
/ 14 мая 2019

У меня есть функция, которую нужно применить к некоторому фрейму данных для выполнения некоторых вычислений.Поскольку датафрейм довольно большой для ускорения вычислений, я решил выбрать Dask для параллельной обработки панд

У меня есть следующий код: https://pastebin.com/Zh672Wei

Основная проблема связана с этими строками:

crosses_data.apply((lambda row: calculate_vwap(row[0], row[1], row[2], vwap_data, row.name)), axis=1)

Код выше работает.Тот же код, но распараллеленный с dask, завершается неудачно с ошибкой. Объект Series не имеет атрибута columns.

dd.from_pandas(crosses_data,npartitions=4).map_partitions(
      lambda df : df.apply((lambda row: calculate_vwap(row[0], row[1], row[2], vwap_data, row.name)), axis=1)).\
   compute(scheduler=get)

Я использовал официальные документы для dask, и эта ошибка действительно имеет смысл.

1 Ответ

0 голосов
/ 15 мая 2019

Может быть, это какая-то магия, но решение следующее :. Функция output_vwap должна иметь вывод в виде вывода:

def calculate_vwap(ric_id, interval_start, interval_finish, vwap_data, row_n):
    some_tmp_vwap_interval_data = \
        vwap_data.query(
            'TKER == @ric_id and interval > @interval_start and interval < '
            '@interval_finish '
        )[['IVWP', 'INTV']]
    if sum(some_tmp_vwap_interval_data['INTV']):
        return \
            sum(
                vwap * volume for vwap, volume in
                zip(some_tmp_vwap_interval_data['IVWP'],
                    some_tmp_vwap_interval_data['INTV'])
            ) \
            / sum(some_tmp_vwap_interval_data['INTV']), \
            some_tmp_vwap_interval_data.IVWP.iloc[0], \
            some_tmp_vwap_interval_data.IVWP.iloc[-1], \
            some_tmp_vwap_interval_data.INTV.sum()

    return None

после этого этот вывод должен быть преобразован в фрейм данных:

pd.DataFrame(
                dd.from_pandas(crosses_data[[
                    'RIC', 'Interval_Start_Human',
                    'Interval_End_Human']],
                               npartitions=int(partitions_number)).
                map_partitions(
                    lambda df: df.apply((
                        lambda row: calculate_vwap(row[0],
                                                   row[1],
                                                   row[2],
                                                   vwap_data,
                                                   row.name)),
                                        axis=1)).
                compute(scheduler=get).values.tolist())
...