Ошибка при применении .map_partition к столбцу над рамкой данных dask - PullRequest
0 голосов
/ 31 октября 2019

Недавно я решил быть более предприимчивым и попытаться изучить больше DASK-данных. Я пытаюсь применить определенную функцию к одному из столбцов данных, синтаксис, который я использую, следующий:

import pandas as pd
import dask.dataframe as dd
import dask.array as da

df_data = pd.DataFrame({'Column 1': [300,300,450,500,500,750,600,300, 150],'Column 2': [100,130,230,200,300,350,600,550,530], 'Column 3': [250, 300, 400, 500, 700,350, 750, 550, 600]})

def TestFunc(x):
    y = x*2 + abs(x/2 - x*3)
    return y

dd_data = dd.from_pandas(df_data, npartitions = 1)
data_test = dd.map_partitions(TestFunc,dd_data['Column 1'])
data_test.compute()

Естественно, это более простой пример, который я только что составил, чтобы показать, как у меняделали. Этот код работает хорошо, проблема в реальной ситуации, с которой я сталкиваюсь. Теперь у меня есть более сложный фрейм данных, где я хочу применить функцию к одному столбцу. Я применяю следующую функцию:

 def GetID(phase):
     nDataPoints = len(phase)
     myRanges = np.deg2rad(np.arange(0,360,6))
     phase[phase>np.deg2rad(354+3)] = 0
     ID = np.array([])
     for i in np.arange(0,nDataPoints):
         val = abs(myRanges-phase[i])
         iID = np.argmin(val)
         ID = np.append(ID, iID+1)
     return ID

Я могу применить функцию к столбцу с .map_partitions, проблема в том, что когда я пытаюсь использовать после .compute(), чтобы увидеть числовые результаты, яполучить ошибку Key error: 0. Я не понимаю, как у меня не было бы проблем с моим предыдущим более простым примером и с ситуацией, с которой я столкнулся.

Надеюсь, что мне удалось быть кратким и точным. Я был бы очень признателен за вашу помощь в этом! Предложения о том, что искать, также приветствуются

1 Ответ

1 голос
/ 03 ноября 2019

Я рекомендую попробовать вашу функцию на обычном фрейме данных Pandas, чтобы убедиться, что он работает правильно:

GetID(df.compute())

Если это сработает, то я бы затем попытался использовать однопоточный планировщик вместе с pdbмодуль для исследования трассировки

df.map_partitions(GetID).compute(scheduler='single-threaded')

Это легко сделать, если вы находитесь в IPython с магией %debug.

...