Варианты ускорения кода Python через распараллеливание / многопроцессорность - PullRequest
0 голосов
/ 20 февраля 2019

Ниже я собрал 4 способа завершить выполнение кода, которое включает сортировку и обновление Pandas Dataframes.

Я хотел бы применить лучшие методы для ускорения выполнения кода.Я использую лучшие доступные методы?


Не могли бы вы поделиться некоторыми мыслями по поводу следующих идей?

  1. Я перебираю фрейм данных, потому что процесс решения моей проблемы, кажется, вызываетдля этого.Будет ли значительное увеличение скорости при использовании Dask Dataframes?

  2. Может ли версия Dask Distributed получить выгоду от установки определенного числа рабочих, процессов, потоков на одного рабочего?Люди отмечают, что увеличение количества процессов вместо потоков (или наоборот) лучше всего подходит для некоторых случаев.

  3. Какая инфраструктура аппаратного обеспечения будет наиболее мощной для такого кода?Многопроцессорная версия еще быстрее на экземпляре AWS с большим количеством физических процессорных ядер.

    • Будет ли намного быстрее установка Kubernetes / AWS с Dask Distributed?
    • Может ли это быть легко адаптировано для запуска с помощью графического процессора локально или на экземпляре AWS с несколькими графическими процессорами?

Это время завершения для справки:

  • Обычный цикл For: 34 seconds
  • Задержка Dask: 21 seconds
  • Распределение Dask (локальный компьютер): 21 seconds
  • Многопроцессорная обработка: 10 seconds

from dask.distributed import Client
from multiprocessing import Pool
from dask import delayed
import pandas as pd
import numpy as np
client = Client()
import random
import dask

#Setting original input data that will be used in the functions
alist=['A','B','C','D','E','F','G','H','I']
set_table=pd.DataFrame({"A":alist,
                        "B":[i for i in range(1,10)],
                        "C":[i for i in range(11,20)],
                        "D":[0]*9})

#Assembled random list of combinations   
criteria_list=[]
for i in range(0,10000):
    criteria_list.append(random.sample(alist,6))

#Sorts and filters the original df
def one_filter_sorter(criteria):
    sorted_table=set_table[set_table['A'].isin(criteria)]
    sorted_table=sorted_table.sort_values(['B','C'],ascending=True)
    return sorted_table

#Exists to help the function below. Simplified for this example    
def helper_function(sorted_table,idx):
    if alist.index(sorted_table.loc[idx,'A'])>5:
        return True

#last function that retuns the gathered result    
def two_go_downrows(sorted_table):

    for idx, row in sorted_table.iterrows():
        if helper_function(sorted_table,idx)==True:
            sorted_table.loc[idx,'D'] = 100 - sorted_table.loc[idx,'C']

    res=sorted_table.loc[:,['A','D']].to_dict()
    return res

#--Loop version
result=[]    
for criteria in criteria_list:
    A=one_filter_sorter(criteria)
    B=two_go_downrows(A)
    result.append(B)

#--Multiprocessed version
result=[]    
if __name__ == '__main__':
    pool=Pool(processes=6)
    A=pool.map(one_filter_sorter, criteria)
    B=pool.map(two_go_downrows, A) 
    result.append(B)

#--Delayed version
result=[]    
for criteria in criteria_list:
    A=delayed(one_filter_sorter)(criteria) 
    B=delayed(two_go_downrows)(A) 
    result.append(B)
dask.compute(result)

#--Distributed version
A= client.map(one_filter_sorter,criteria_list) 
B= client.map(two_go_downrows,A)
client.gather(B)    

Спасибо

...