Python multiprocessing.Pool медленнее, чем последовательное выполнение - PullRequest
0 голосов
/ 26 ноября 2018

Я пытаюсь написать программу, которая работает с длинным списком элементов (в примере кода этот список называется training_set ).Каждая строка списка содержит два числа, которые должны быть найдены в другом списке с именем IDs : следовательно, моя программа выполняет итерации по строкам training_set и для каждого из них находитсоответствующие 2 числа в идентификаторах , а затем выполняет еще несколько вычислений (не показано в коде).

При последовательном выполнении это требует около 300 с.Поскольку каждая строка training_set не зависит от других, я думал о распараллеливании вычислений путем разделения входных данных с помощью #cpu_cores, используя multiprocessing.Pool .Однако распараллеленная версия работает медленнее, чем последовательная.

num_procs = int(multiprocessing.cpu_count())

with open("training_set.txt", "r") as f:
    reader = csv.reader(f)
    training_set  = list(reader)
training_set = [element[0].split(" ") for element in training_set]

with open("node_information.csv", "r") as f:
    reader = csv.reader(f)
    node_info  = list(reader)
IDs = [element[0] for element in node_info]

batch_size = int(len(training_set)/num_procs)
inputs=[]

# split list into batches to feed to the different threads
for i in range(num_procs):
    if i == (num_procs-1): inputs.append(list(training_set[int(i*batch_size):(len(training_set)-1)]))
    else: inputs.append(list(training_set[int(i*batch_size): int((i+1)*batch_size)]))

def init(IDs):
    global identities
    identities = copy.deepcopy(IDs)

def analyze_pairs(partialList):
    pairsSet = copy.deepcopy(partialList)
    for i in range(len(pairsSet)):
        source = pairsSet[i][0] # an ID of edges
        target = pairsSet[i][1] # an ID of edges
        ## find an index maching to the source ID
        index_source = identities.index(source)
        index_target = identities.index(target)
        ***additional computation***

if __name__ == '__main__':
    pool = Pool(num_procs, initializer=init, initargs=(IDs,))
    training_features = pool.map(analyze_pairs, inputs)

Я не показываю остальную часть кода цикла for (в конце analysis_pairs () )потому что проблема сохраняется, даже если я удаляю этот код, следовательно, проблема не в этом.)

Я знаю, что по этой теме уже есть много вопросов, но я не смог найти решение для своего случая,
Я не думаю, что здесь параллелизм вносит больше накладных расходов, чем ускорение, потому что вход каждого потока велик (на процессоре с 8 потоками, каждый поток должен занимать не менее 35 с) и нет явной передачи сообщений.Я также попытался использовать copy.deepcopy , чтобы убедиться, что каждый поток работает в отдельном списке (хотя это не должно быть проблемой, поскольку каждый поток выполняет только действия чтения из списка), но этоне работал
В чем может быть проблема?Заранее спасибо.

...