Multiprocessing Pool.apply выполняется n-1 раз - PullRequest
0 голосов
/ 31 января 2019

У меня проблемы с multiprocessing.Pool.apply.
Моя цель состоит в том, чтобы иметь 5 процессов, каждый из которых заполняет массив 100 элементами (по 100 для этого теста), а затем объединяет массивы в один с длиной 500.Проблема в том, что по какой-то причине я могу понять, что в нем всего 400 элементов.

Я пытался изменить количество процессов, созданных пулом, но это ничего не изменило, кроме времени выполнения.

import torch.multiprocessing as mp
import itertools

pool = mp.Pool(processes=5)
split = int(500/5)
lst =  pool.apply(RampedGraph, (split,[]))    #each foo returns a list of 100 elements
lst = list(itertools.chain.from_iterable(lst)) #merging the lists into one

len(lst)
>>>400

Ожидаемый результат len(lst) должен быть 500.
Может кто-нибудь просветить меня, что я делаю неправильно?

РЕДАКТИРОВАТЬ Метод Foo объяснил:

def RampedGraph(popsize, graph_lst):
    cyclic_size = int(math.ceil(popsize/2))
    acyclic_size = popsize - full_size
    append = graph_lst.append
    for _ in range(cyclic_size):
        t = c.Node().cyclic()
        nn = c.number_of_nodes()
        c = c.calculate(0, False)
        append((t,nn,c))
    for _ in range(acyclic_size):
        t = c.Node().acyclic()
        nn = c.number_of_nodes()
        c = c.calculate(0, False)
        append((t,nn,c))
    return graph_lst

1 Ответ

0 голосов
/ 31 января 2019
import torch.multiprocessing as mp
# import multiprocessing as mp
import itertools

def RampedGraph(popsize, graph_lst):
    print(mp.current_process().name)
    return list(range(100))

num_workers = 5
pool = mp.Pool(processes=num_workers)
split = int(500/num_workers)
lst =  pool.starmap(RampedGraph, [(split,[])]*num_workers)
lst = list(itertools.chain.from_iterable(lst)) 
print(len(lst))
# 500

pool.starmap(RampedGraph, [(split,[])]*5) отправляет 5 задач в пул задач.Это заставляет RampedGraph(split, []) вызываться 5 раз одновременно.5 результатов, возвращаемых RampedGraph, собираются в список, lst.

Обратите внимание, что одновременный вызов RampedGraph 5 раз не гарантирует, что будут использованы все 5 процессоров.Например, если RampedGraph закончить очень быстро, возможно, что один процессор обрабатывает более одной задачи, и, возможно, другой процессор вообще никогда не используется.Однако, если RampedGraph занимает нетривиальное количество времени, в целом можно ожидать, что будут использованы все 5 рабочих процессов.

Примечание. Я запускал приведенный выше код с import multiprocessing as mp вместо import torch.multiprocessing as mp,Но поскольку torch.multiprocessing должна быть заменой multiprocessing, это не должно иметь значения.


Использование multiprocessing сопряжено с затратами и преимуществами.Преимущество, конечно же, заключается в возможности одновременного использования нескольких процессоров.Стоимость включает время, необходимое для запуска дополнительных процессов, и стоимость межпроцессного взаимодействия.multiprocessing использует Queues для передачи аргументов в функцию, выполняемую рабочими процессами, и для передачи возвращенного значения обратно в основной процесс.Чтобы транспортировать возвращенные значения через очереди, объекты сериализуются в байты посредством протравливания.Если выбранные объекты, отправляемые через очереди, велики, это может привести к значительным накладным расходам при использовании многопроцессорной обработки.Обратите внимание, что все эти затраты не связаны с эквивалентной последовательной версией того же кода.

В частности, когда функция, выполняемая рабочими процессами, быстро завершается, накладные расходы могут доминировать в общем времени выполнения программы, создавая кодкоторый использует многопроцессорность медленнее, чем последовательная версия того же кода.

Таким образом, ключ к скорости при использовании многопроцессорности - попытка минимизировать межпроцессное взаимодействие и убедиться, что рабочие процессы выполняют большую работу, поэтому накладные расходызатраты становятся относительно малой частью общего времени выполнения.

...