Параллельная обработка Dask намного медленнее, чем использование пафосной многопроцессорной обработки. - PullRequest
0 голосов
/ 10 февраля 2020

Я относительный новичок ie, использующий dask, и я пытался понять, как я могу использовать многопроцессорность dask выше и выше пафосной многопроцессорности. К моему удивлению, напиток в 3-4 раза медленнее пафоса. Я явно делаю что-то не так и буду признателен за любые указания по этому вопросу.

Ниже приведен код, в котором я пытаюсь настроить тривиальную арифметику c манипуляции:

from pathos.multiprocessing import ProcessingPool as Pool
import pandas as pd, numpy as np,time
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()

class test_pathos:
    def __init__(self):
        self.NumCols = 270
        self.NumRows = 250000
        self.cols = ['Col'+ str(i) for i in range(self.NumCols)]
        self.data = pd.DataFrame(np.random.randint(0,5,size=(self.NumRows,self.NumCols)),columns=self.cols)


    def ProcessCol(self,x):
        colname = x.name
        DQCol = colname + r'_DQ'
        self.data.loc[:, DQCol] = self.data[colname] + 1

    def AddTodata(self,colname):
        DQColumn = colname+r'_DQ'
        self.data.loc[:,DQColumn] = self.data[colname]+1
        return self.data[DQColumn]

    def AddProcess(self):
        p = Pool(nodes = nCores)
        ChangedCols = p.map(self.AddTodata,self.cols)
        ChangedColsDf = pd.concat(ChangedCols,axis=1)
        self.data = pd.concat([self.data,ChangedColsDf],axis=1)

    def AddProcess_apply(self):
        '---self.data.apply(self.ProcessCol)'
        dd.from_pandas(self.data, npartitions=nCores).map_partitions(lambda df : df.apply(self.ProcessCol)).compute(scheduler='processes')


'----------------------------------------------------MAIN---------------------------------------------------------------------'
if __name__ == "__main__":
    test_obj = test_pathos()
    tinit = time.time()
    shapebeforetransmutation = test_obj.data.shape
    test_obj.AddProcess()
    shapeaftertransmutation = test_obj.data.shape
    print('Pathos call time is :', time.time() - tinit)
    tinit = time.time()
    test_obj.AddProcess_apply()
    print('Dask call time is : ', time.time() - tinit)

1 Ответ

0 голосов
/ 16 февраля 2020

Производительность зависит от многих вещей. Это могут быть накладные расходы, передача данных или что-то еще. К сожалению, это не очень легко сказать, фактически не переделав эксперимент.

Для лучшего понимания производительности Dask, я рекомендую прочитать документы Понимание производительности

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...