Я пытаюсь написать программу, которая работает с длинным списком элементов (в примере кода этот список называется training_set ).Каждая строка списка содержит два числа, которые должны быть найдены в другом списке с именем IDs : следовательно, моя программа выполняет итерации по строкам training_set и для каждого из них находитсоответствующие 2 числа в идентификаторах , а затем выполняет еще несколько вычислений (не показано в коде).
При последовательном выполнении это требует около 300 с.Поскольку каждая строка training_set не зависит от других, я думал о распараллеливании вычислений путем разделения входных данных с помощью #cpu_cores, используя multiprocessing.Pool .Однако распараллеленная версия работает медленнее, чем последовательная.
num_procs = int(multiprocessing.cpu_count())
with open("training_set.txt", "r") as f:
reader = csv.reader(f)
training_set = list(reader)
training_set = [element[0].split(" ") for element in training_set]
with open("node_information.csv", "r") as f:
reader = csv.reader(f)
node_info = list(reader)
IDs = [element[0] for element in node_info]
batch_size = int(len(training_set)/num_procs)
inputs=[]
# split list into batches to feed to the different threads
for i in range(num_procs):
if i == (num_procs-1): inputs.append(list(training_set[int(i*batch_size):(len(training_set)-1)]))
else: inputs.append(list(training_set[int(i*batch_size): int((i+1)*batch_size)]))
def init(IDs):
global identities
identities = copy.deepcopy(IDs)
def analyze_pairs(partialList):
pairsSet = copy.deepcopy(partialList)
for i in range(len(pairsSet)):
source = pairsSet[i][0] # an ID of edges
target = pairsSet[i][1] # an ID of edges
## find an index maching to the source ID
index_source = identities.index(source)
index_target = identities.index(target)
***additional computation***
if __name__ == '__main__':
pool = Pool(num_procs, initializer=init, initargs=(IDs,))
training_features = pool.map(analyze_pairs, inputs)
Я не показываю остальную часть кода цикла for (в конце analysis_pairs () )потому что проблема сохраняется, даже если я удаляю этот код, следовательно, проблема не в этом.)
Я знаю, что по этой теме уже есть много вопросов, но я не смог найти решение для своего случая,
Я не думаю, что здесь параллелизм вносит больше накладных расходов, чем ускорение, потому что вход каждого потока велик (на процессоре с 8 потоками, каждый поток должен занимать не менее 35 с) и нет явной передачи сообщений.Я также попытался использовать copy.deepcopy , чтобы убедиться, что каждый поток работает в отдельном списке (хотя это не должно быть проблемой, поскольку каждый поток выполняет только действия чтения из списка), но этоне работал
В чем может быть проблема?Заранее спасибо.