Как распараллелить итерацию по диапазону, используя StdLib и Python 3? - PullRequest
0 голосов
/ 04 октября 2018

Я искал ответ по этому вопросу уже несколько дней, но безрезультатно.Я, вероятно, просто не понимаю фрагменты, которые распространяются вокруг, и документация Python для модуля multiprocessing довольно большая и неясная для меня.

Скажем, у вас есть следующее для цикла:

import timeit


numbers = []

start = timeit.default_timer()

for num in range(100000000):
    numbers.append(num)

end = timeit.default_timer()

print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

Вывод:

TIME: 23.965870224497916 seconds
SUM: 4999999950000000

Для этого примера, скажем, у вас есть 4-ядерный процессор.Есть ли способ создать в общей сложности 4 процесса, где каждый процесс выполняется на отдельном ядре ЦП и завершается примерно в 4 раза быстрее, поэтому 24 с / 4 процесса = ~ 6 секунд?

Каким-то образом разделите цикл for на 4равные чанки, а затем добавить 4 чанков в список чисел для приравнивания к одной и той же сумме?Был такой поток в стеке: Parallel Simple For Loop , но я его не понимаю.Спасибо всем.

Ответы [ 3 ]

0 голосов
/ 04 октября 2018

Да, это выполнимо.Ваш расчет не зависит от промежуточных результатов, поэтому вы можете легко разделить задачу на куски и распределить ее по нескольким процессам.Это то, что называется

смущающе параллельной задачей .

Единственная сложная часть здесь может быть разделить диапазон на довольно равные части впервое место.Скажу прямо в моей личной библиотеке две функции, чтобы справиться с этим:

# mp_utils.py

from itertools import accumulate

def calc_batch_sizes(n_tasks: int, n_workers: int) -> list:
    """Divide `n_tasks` optimally between n_workers to get batch_sizes.

    Guarantees batch sizes won't differ for more than 1.

    Example:
    # >>>calc_batch_sizes(23, 4)
    # Out: [6, 6, 6, 5]

    In case you're going to use numpy anyway, use np.array_split:
    [len(a) for a in np.array_split(np.arange(23), 4)]
    # Out: [6, 6, 6, 5]
    """
    x = int(n_tasks / n_workers)
    y = n_tasks % n_workers
    batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y)

    return batch_sizes


def build_batch_ranges(batch_sizes: list) -> list:
    """Build batch_ranges from list of batch_sizes.

    Example:
    # batch_sizes [6, 6, 6, 5]
    # >>>build_batch_ranges(batch_sizes)
    # Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)]
    """
    upper_bounds = [*accumulate(batch_sizes)]
    lower_bounds = [0] + upper_bounds[:-1]
    batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)]

    return batch_ranges

Тогда ваш главный скрипт будет выглядеть так:

import time
from multiprocessing import Pool
from mp_utils import calc_batch_sizes, build_batch_ranges


def target_foo(batch_range):
    return sum(batch_range)  # ~ 6x faster than target_foo1


def target_foo1(batch_range):
    numbers = []
    for num in batch_range:
        numbers.append(num)
    return sum(numbers)


if __name__ == '__main__':

    N = 100000000
    N_CORES = 4

    batch_sizes = calc_batch_sizes(N, n_workers=N_CORES)
    batch_ranges = build_batch_ranges(batch_sizes)

    start = time.perf_counter()
    with Pool(N_CORES) as pool:
        result = pool.map(target_foo, batch_ranges)
        r_sum = sum(result)
    print(r_sum)
    print(f'elapsed: {time.perf_counter() - start:.2f} s')

Обратите внимание, что я также переключил ваш цикл for дляпростая сумма по объекту диапазона, так как он предлагает гораздо лучшую производительность.Если вы не можете сделать это в своем реальном приложении, понимание списка все равно будет на ~ 60% быстрее, чем заполнение списка вручную, как в вашем примере.

Пример вывода:

4999999950000000
elapsed: 0.51 s

Process finished with exit code 0
0 голосов
/ 04 октября 2018

Я сделал сравнение, время, необходимое для разделения задач, иногда может занять больше времени:

Файл multiprocessing_summation.py:

def summation(lst):
  sum = 0
  for x in range(lst[0], lst[1]):
    sum += x
  return sum

Файл multiprocessing_summation_master.py:

%%file ./examples/multiprocessing_summation_master.py
import multiprocessing as mp
import timeit
import os
import sys
import multiprocessing_summation as mps

if __name__ == "__main__":

  if len(sys.argv) == 1:
    print(f'{sys.argv[0]} <number1 ...>')
    sys.exit(1)
  else:
    args = [int(x) for x in sys.argv[1:]]

  nBegin = 1
  nCore = os.cpu_count()

  for nEnd in args:

    ### Approach 1  ####
    ####################
    start = timeit.default_timer()
    answer1 = mps.summation((nBegin, nEnd+1))
    end = timeit.default_timer()
    print(f'Answer1 = {answer1}')
    print(f'Time taken = {end - start}')

    ### Approach 2 ####
    ####################
    start = timeit.default_timer()
    lst = []
    for x in range(nBegin, nEnd, int((nEnd-nBegin+1)/nCore)):
      lst.append(x)
    lst.append(nEnd+1)

    lst2 = []
    for x in range(1, len(lst)):
      lst2.append((lst[x-1], lst[x]))

    with mp.Pool(processes=nCore) as pool:
      answer2 = pool.map(mps.summation, lst2)
    end = timeit.default_timer()
    print(f'Answer2 = {sum(answer2)}')
    print(f'Time taken = {end - start}')

Запустите второй скрипт:

python multiprocessing_summation_master.py 1000 100000 10000000 1000000000

Выходные данные:

Answer1 = 500500
Time taken = 4.558405389566795e-05
Answer2 = 500500
Time taken = 0.15728066685459452
Answer1 = 5000050000
Time taken = 0.005781152051264199
Answer2 = 5000050000
Time taken = 0.14532123447452705
Answer1 = 50000005000000
Time taken = 0.4903863230334036
Answer2 = 50000005000000
Time taken = 0.49744346392131533
Answer1 = 500000000500000000
Time taken = 50.825169837068
Answer2 = 500000000500000000
Time taken = 26.603663061636567
0 голосов
/ 04 октября 2018
import timeit

from multiprocessing import Pool

def appendNumber(x):
    return x

start = timeit.default_timer()

with Pool(4) as p:
    numbers = p.map(appendNumber, range(100000000))

end = timeit.default_timer()

print('TIME: {} seconds'.format(end - start))
print('SUM:', sum(numbers))

Так что Pool.map похоже на встроенную функцию map.Он принимает функцию и итерируемую и создает список результатов вызова этой функции для каждого элемента итерируемой.Здесь, поскольку мы на самом деле не хотим изменять элементы в итерируемом диапазоне, мы просто возвращаем аргумент.

Важно то, что Pool.map делит предоставленную итерацию (range(1000000000) здесь) на куски иотправляет их на количество процессов, которые оно имеет (определено здесь как 4 в Pool(4)), затем объединяет результаты обратно в один список.

Вывод, который я получаю при запуске, это

TIME: 8.748245699999984 seconds
SUM: 4999999950000000
...