DASK: Typerrror: назначение столбца не поддерживает тип numpy.ndarray, тогда как Pandas работает нормально - PullRequest
0 голосов
/ 06 октября 2019

Я использую Dask для чтения в 10-метровой строке csv + и выполнения некоторых вычислений. Пока он оказывается в 10 раз быстрее, чем Панды.

У меня есть фрагмент кода ниже, который при использовании с пандами работает нормально, но с dask выдает ошибку типа. Я не уверен, как преодолеть опечатку . Кажется, что массив передается обратно в фрейм данных / столбец с помощью функции select при использовании dask, но не при использовании pandas? Но я не хочу переключать все это обратно на панд и терять 10-кратное преимущество в производительности.

Этот ответ является результатом некоторой помощи некоторых других по переполнению стека, однако я думаю, что этот вопрос достаточно далеко отклонился от первоначального вопроса, что это в целом отличается. Код ниже.

PANDAS: Работает Время, прошедшее без AndHeathSolRadFact: 40 секунд

import pandas as pd
import numpy as np

from timeit import default_timer as timer
start = timer()
df = pd.read_csv(r'C:\Users\i5-Desktop\Downloads\Weathergrids.csv')
df['DateTime'] = pd.to_datetime(df['Date'], format='%Y-%d-%m %H:%M')
df['Month'] = df['DateTime'].dt.month
df['Grass_FMC'] = (97.7+4.06*df['RH'])/(df['Temperature']+6)-0.00854*df['RH']+3000/df['Curing']-30


df["AndHeathSolRadFact"] = np.select(
    [
    (df['Month'].between(8,12)),
    (df['Month'].between(1,2) & df['CloudCover']>30)
    ],  #list of conditions
    [1, 1],     #list of results
    default=0)    #default if no match



print(df.head())
#print(ddf.tail())
end = timer()
print(end - start)

DASK: BROKEN Время, принятое без AndHeathSolRadFact: 4 секунды

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
import pandas as pd
import numpy as np

# Dataframes implement the Pandas API
import dask.dataframe as dd



from timeit import default_timer as timer
start = timer()
ddf = dd.read_csv(r'C:\Users\i5-Desktop\Downloads\Weathergrids.csv')
ddf['DateTime'] = dd.to_datetime(ddf['Date'], format='%Y-%d-%m %H:%M')
ddf['Month'] = ddf['DateTime'].dt.month
ddf['Grass_FMC'] = (97.7+4.06*ddf['RH'])/(ddf['Temperature']+6)-0.00854*ddf['RH']+3000/ddf['Curing']-30



ddf["AndHeathSolRadFact"] = np.select(
    [
    (ddf['Month'].between(8,12)),
    (ddf['Month'].between(1,2) & ddf['CloudCover']>30)
    ],  #list of conditions
    [1, 1],     #list of results
    default=0)    #default if no match



print(ddf.head())
#print(ddf.tail())
end = timer()
print(end - start)


Ошибка

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-50-86c08f38bce6> in <module>
     29     ],  #list of conditions
     30     [1, 1],     #list of results
---> 31     default=0)    #default if no match
     32 
     33 

~\Anaconda3\lib\site-packages\dask\dataframe\core.py in __setitem__(self, key, value)
   3276             df = self.assign(**{k: value for k in key})
   3277         else:
-> 3278             df = self.assign(**{key: value})
   3279 
   3280         self.dask = df.dask

~\Anaconda3\lib\site-packages\dask\dataframe\core.py in assign(self, **kwargs)
   3510                 raise TypeError(
   3511                     "Column assignment doesn't support type "
-> 3512                     "{0}".format(typename(type(v)))
   3513                 )
   3514             if callable(v):

TypeError: Column assignment doesn't support type numpy.ndarray

Образец Weathegrids CSV

Location,Date,Temperature,RH,WindDir,WindSpeed,DroughtFactor,Curing,CloudCover
1075,2019-20-09 04:00,6.8,99.3,143.9,5.6,10.0,93.0,1.0 
1075,2019-20-09 05:00,6.4,100.0,93.6,7.2,10.0,93.0,1.0
1075,2019-20-09 06:00,6.7,99.3,130.3,6.9,10.0,93.0,1.0
1075,2019-20-09 07:00,8.6,95.4,68.5,6.3,10.0,93.0,1.0
1075,2019-20-09 08:00,12.2,76.0,86.4,6.1,10.0,93.0,1.0

1 Ответ

0 голосов
/ 06 октября 2019

Этот ответ не элегантный, но функциональный.

Я обнаружил, что функция выбора была примерно на 20 секунд быстрее в наборе данных 11-метрового ряда в пандах. Я также обнаружил, что даже если бы я выполнял ту же функцию в dask, то результат вернул бы массив numpy (pandas). Dask по своей природе не может принять это, но есть возможность передавать кадры данных между dask и пандами.

Итак, я получаю выгоду от загрузки и преобразования даты в сумерках (4 секунды по сравнению с 40 секундами в пандах), преимущества выбора с использованием панд (40 секунд по сравнению с 60 секундами в темноте), и просто нужнопринять, что я буду использовать больше памяти.

Существует небольшая потеря времени при преобразовании между кадрами данных.

Наконец, мне нужно было убедиться, что я очистил кадры данных, так как python не очищал память между тестовыми прогонами и просто продолжал накапливать.

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
import pandas as pd
import numpy as np

# Dataframes implement the Pandas API
import dask.dataframe as dd

from timeit import default_timer as timer
start = timer()
ddf = dd.read_csv(r'C:\Users\i5-Desktop\Downloads\Weathergrids.csv')
#print(ddf.describe(include='all'))

#Wrangle the dates so we can interrogate them
ddf['DateTime'] = dd.to_datetime(ddf['Date'], format='%Y-%d-%m %H:%M')
ddf['Month'] = ddf['DateTime'].dt.month

#Grass Fuel Moisture Content
ddf['Grass_FMC'] = (97.7+4.06*ddf['RH'])/(ddf['Temperature']+6)-0.00854*ddf['RH']+3000/ddf['Curing']-30

#Convert to a Pandas DataFrame because dask was being slow with the select logic below
df = ddf.compute() 
del [ddf]

#ddf["AndHeathSolRadFact"] = np.select(
#Solar Radiation Factor - this seems to take 32 seconds. Why?
df["AndHeathSolRadFact"] = np.select(
    [
    (df['Month'].between(8,12)),
    (df['Month'].between(1,2) & df['CloudCover']>30)
    ],  #list of conditions
    [1, 1],     #list of results
    default=0)    #default if no match

#Convert back to a Dask dataframe because we want that juicy parallelism
ddf2 = dd.from_pandas(df,npartitions=4)
del [df]

print(ddf2.head())
#print(ddf.tail())
end = timer()
print(end - start)

#Clean up remaining dataframes
del [[ddf2]]
...