Совместная работа с Python - PullRequest
0 голосов
/ 19 октября 2019

У меня сложная процедура, которую я могу распараллелить. Параллельные трассы не пересекаются и независимы. В конце выполнения я хотел бы получить некоторые данные о пробеге в упорядоченном виде. Поскольку прогоны не пересекаются, и я знаю, сколько их прогонов, я попытался выделить массив и заполнить его None и заставить каждый поток манипулировать своим непересекающимся сектором. Тем не менее, я получаю массив None вместо того, что я хочу. Это просто упрощенный тривиальный пример:

from multiprocessing import Pool, Array
import math

bins = 1000
buckets = 4 # number of threads
dump = [None for i in range(bins)]

def iAmSlow(i):
  # I return a tuple, my return tuples are usually the same type
  # but there are some exceptions
  # (ie. double array of size 5, vs empty array)
  if i%13 == 0:
    return [], []
  return [k*i for k in range(5)], [k-i for k in range(5)]

def iAmParallelizable(args):
  start, end = args
  for i in range(start, end):
    a, b = iAmSlow(i)
    dump[i] = (a, b)
    print(dump[i], i)

incr = math.ceil(bins/buckets)
starts = [min(incr*j, bins) for j in range(buckets+1)]
arguments = [ (starts[i], starts[i+1]) for i in range(buckets)]
with Pool(buckets) as thrd:
  thrd.map(iAmParallelizable, arguments)

for i in range(len(dump)):
  print(dump[i])

1 Ответ

0 голосов
/ 19 октября 2019

Python multiprocessing имеет специальные контейнеры для совместного использования состояния между процессами . Вот модифицированный код:

from multiprocessing import Pool, Manager
import math

bins = 1000
buckets = 4 # number of threads

def iAmSlow(i):
  # I return a tuple, my return tuples are usually the same type
  # but there are some exceptions
  # (ie. double array of size 5, vs empty array)
  if i%13 == 0:
    return [], []
  return [k*i for k in range(5)], [k-i for k in range(5)]

def iAmParallelizable(args):
  start, end, dump = args
  for i in range(start, end):
    a, b = iAmSlow(i)
    dump[i] = (a, b)

with Manager() as manager:
  incr = math.ceil(bins/buckets)
  starts = [min(incr*j, bins) for j in range(buckets+1)]
  dump = manager.list([None for i in range(bins)])
  arguments = [ (starts[i], starts[i+1], dump) for i in range(buckets)]
  with Pool(buckets) as thrd:
    thrd.map(iAmParallelizable, arguments)

  for i in range(len(dump)):
    print(dump[i])
...