как обеспечить многопроцессорность кода с использованием настроенных ядер процессора? - PullRequest
0 голосов
/ 19 июня 2020

Я использую multiprocessing Pool для запуска parallel. Я пробовал сначала с 4 ядрами, с HPC с sub. Когда он использует 4 ядра, время сокращается в 4 раза по сравнению с 1 ядром. Когда я проверяю с помощью qstat, несколько раз он использует 4 ядра, но после этого только 1 ядро ​​с точно таким же кодом.

Не могли бы вы посоветовать, что не так с моим кодом или системой?

import pandas as pd
import numpy as np
from multiprocessing import Pool
from datetime import datetime

t1 = pd.read_csv("template.csv",header=None)

s1 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_adfr.csv")
s2 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_dock.csv")
s3 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_gemdock.csv")
s4 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_ledock.csv")
s5 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_plants.csv")
s6 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_psovina.csv")
s7 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_quickvina2.csv")
s8 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_smina.csv")
s9 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_vina.csv")
s10 = pd.read_csv("/home/donp/dude_1000_raw_raw/dude_1000_raw_raw_vinaxb.csv")

#number of core and arrays

n = 4
m = (len(t1) // n)+1
g= m*n - len(t1)
for g1 in range(g):
    t1.loc[len(t1)]=0


results=[]

def block_linear(i):

    temp = pd.DataFrame(np.zeros((m,29)))

    for a in range(0,m):
        sum_matrix = (t1.iloc[a,0]*s1) + (t1.iloc[a,1]*s2) + (t1.iloc[a,2]*s3)+ (t1.iloc[a,3]*s4) + (t1.iloc[a,4]*s5) + (t1.iloc[a,5]*s6) + (t1.iloc[a,6]*s7) + (t1.iloc[a,7]*s8) + (t1.iloc[a,8]*s9) + (t1.iloc[a,9]*s10)
        rank_sum= pd.DataFrame.rank(sum_matrix,axis=0,ascending=True,method='min') #real-True
        temp.iloc[a,:] = rank_sum.iloc[999].values
    temp['median'] = temp.median(axis=1)
    temp.index = range(i*m,(i+1)*m)
    return temp

start=datetime.now()

if __name__ == '__main__':
    pool = Pool(processes=n)
    results = pool.map(block_linear,range(0,n))

print(datetime.now()-start)

out=pd.concat(results)
out.drop(out.tail(g).index,inplace=True)
out.to_csv('test_10dock_4core.csv',index=False)

Основная идея состоит в том, чтобы разрезать большую таблицу на более мелкие, проводить вычисления и объединять вместе.

1 Ответ

1 голос
/ 19 июня 2020

Без более детального использования пакета Multiprocessing Pool действительно сложно понять и помочь. Обратите внимание, что пакет Pool не гарантирует распараллеливание: функция _apply , например, использует только одного рабочего из пула и блокирует все ваши исполнения. Вы можете узнать больше об этом здесь и там .

Но, если вы правильно используете библиотеку, вы должны убедиться, что ваш код полностью распараллеливается: Операция ввода-вывода на диске, например, может затруднить параллелизацию и, таким образом, заставить ваш код работать только в одном процессе за раз.

Надеюсь, это помогло.


[Edit ] Поскольку вы предоставили более подробную информацию о своей проблеме, я могу дать более подробные c советы:

Во-первых, ваш код является нулевым параллельным. Вы просто вызываете одну и ту же функцию N раз. Не так должна работать многопроцессорная обработка. Вместо этого, часть, которая должна быть параллельна, - это та, которая обычно находится в циклах для , как и та, что у вас внутри block_linear ().

Итак, что я вам рекомендую:

Вы должны изменить свой код, чтобы сначала вычислить всю вашу взвешенную сумму и только после этого выполнять остальные операции. Это очень поможет с распараллеливанием. Итак, поместите эту операцию в функцию:

  def weighted_sum(column,df2):
    temp = pd.DataFrame(np.zeros(m))
    for a in range(0,m):
        result = (t1.iloc[a,column]*df2)
        temp.iloc[a] = result
    return temp

Итак, вы используете pool.starmap для параллельного выполнения функции для 10 имеющихся у вас фреймов данных, примерно так:

results = pool .starmap (weighted_sum, [(0, s1), (1, s2), (2, s3), ...., [9, s10]]])

ps: pool.starmap похож на pool.map , но принимает список аргументов кортежа. Вы можете получить более подробную информацию об этом здесь .

Наконец, но не в последнюю очередь, вы должны обработать свои результаты, чтобы завершить свои вычисления. Поскольку у вас будет одна weighted_sum на столбец, вы можете применить сумму по столбцам, а затем rank_sum. код, чтобы иметь преимущество в многопроцессорности. Я рекомендую вам протестировать его на подвыборке фреймов данных, чтобы убедиться, что он работает правильно, прежде чем запускать его для всех ваших данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...