Я относительный новичок ie, использующий dask, и я пытался понять, как я могу использовать многопроцессорность dask выше и выше пафосной многопроцессорности. К моему удивлению, напиток в 3-4 раза медленнее пафоса. Я явно делаю что-то не так и буду признателен за любые указания по этому вопросу.
Ниже приведен код, в котором я пытаюсь настроить тривиальную арифметику c манипуляции:
from pathos.multiprocessing import ProcessingPool as Pool
import pandas as pd, numpy as np,time
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()
class test_pathos:
def __init__(self):
self.NumCols = 270
self.NumRows = 250000
self.cols = ['Col'+ str(i) for i in range(self.NumCols)]
self.data = pd.DataFrame(np.random.randint(0,5,size=(self.NumRows,self.NumCols)),columns=self.cols)
def ProcessCol(self,x):
colname = x.name
DQCol = colname + r'_DQ'
self.data.loc[:, DQCol] = self.data[colname] + 1
def AddTodata(self,colname):
DQColumn = colname+r'_DQ'
self.data.loc[:,DQColumn] = self.data[colname]+1
return self.data[DQColumn]
def AddProcess(self):
p = Pool(nodes = nCores)
ChangedCols = p.map(self.AddTodata,self.cols)
ChangedColsDf = pd.concat(ChangedCols,axis=1)
self.data = pd.concat([self.data,ChangedColsDf],axis=1)
def AddProcess_apply(self):
'---self.data.apply(self.ProcessCol)'
dd.from_pandas(self.data, npartitions=nCores).map_partitions(lambda df : df.apply(self.ProcessCol)).compute(scheduler='processes')
'----------------------------------------------------MAIN---------------------------------------------------------------------'
if __name__ == "__main__":
test_obj = test_pathos()
tinit = time.time()
shapebeforetransmutation = test_obj.data.shape
test_obj.AddProcess()
shapeaftertransmutation = test_obj.data.shape
print('Pathos call time is :', time.time() - tinit)
tinit = time.time()
test_obj.AddProcess_apply()
print('Dask call time is : ', time.time() - tinit)