Как уменьшить IP C в многопроцессорной карте пула / starmap? - PullRequest
0 голосов
/ 09 мая 2020

У меня есть программа, которая сильно ограничена процессором и (теоретически) распараллеливается. Задача состоит в том, чтобы вызвать метод (small_workload) объекта, изменяющий состояние объекта, и сделать это для пакета / списка таких объектов. Суть в том, что рабочая нагрузка на вызов относительно мала ; однако состояние объекта относительно велико . Это приводит к нежелательным накладным расходам в IP C и делает многоядерную реализацию значительно (!) Медленнее по сравнению с одноядерной реализацией.

Так что мой вопрос кажется совершенно невинным: как вы go об уменьшении этого IP C?

Вот минимально рабочий пример, который воспроизводит проблему:

import multiprocessing as mp
import time


class Element(object):
    def __init__(self, num_items):
        self.large_attribute = [0] * num_items

    def small_workload(self, pos):
        self.large_attribute[pos] += 1


def init_helper(obj, obj_size):
    return obj(obj_size)


def work_helper(obj, pos):
    obj.small_workload(pos)
    return obj


def test_obj_list(obj_list, num_iter):
    # test that order has not been scrambled
    for obj_idx, obj in enumerate(obj_list):
        for attr_idx, attribute in enumerate(obj.large_attribute):
            if obj_idx == attr_idx:
                assert attribute == num_iter
            else:
                assert attribute == 0


def execute_work(num_obj=1, num_workers=0, num_extra_payload=0, num_iter=200):
    obj_size = num_obj + num_extra_payload

    if num_workers == 0:
        # create the objects
        obj_list = [obj for obj in map(init_helper,
                                       [Element] * num_obj,
                                       [obj_size] * num_obj)]

        # process objects
        for _ in range(num_iter):
            obj_idx = range(num_obj)  # helper for neat code
            obj_list = [obj for obj in map(work_helper, obj_list, obj_idx)]
            # assume some more work here that prevents the for from being
            # parallelized

    else:
        with mp.Pool(processes=num_workers) as workers:
            # create the objects
            obj_list = workers.starmap(init_helper, zip([Element] * num_obj,
                                                        [obj_size] * num_obj))

            for _ in range(num_iter):
                obj_idx = range(num_obj)  # helper for neat code
                obj_list = workers.starmap(work_helper, zip(obj_list, obj_idx))
                # assume some more work here that prevents the for from being
                # parallelized

    return obj_list


if __name__ == "__main__":
    start_time = time.time()
    obj_list = execute_work(1000, 0)
    total_time = time.time() - start_time
    test_obj_list(obj_list, 200)
    print("Single Worker Time: " + str(total_time))

    start_time = time.time()
    obj_list = execute_work(1000, num_workers=3)
    total_time = time.time() - start_time
    test_obj_list(obj_list, 200)
    print("3 Workers Time: " + str(total_time))


И вот мои тайминги (Win10, Intel Xeon E3-1230 @ 3.30GHz ):

Single Worker Time: 0.06699895858764648
3 Workers Time: 11.602166652679443

Обратите внимание, что я не контролирую внутреннюю структуру объекта ; Я знаю только, что у него есть метод small_workload, который он наследует от родителя; однако неизвестно, как этот метод выполняет работу. Однако результирующая рабочая нагрузка всегда относительно невелика.

Я могу концептуально разделить состояние объекта на две части. Неизменяемая часть, на которую никогда не влияет small_workload (может быть изменена с помощью num_extra_payload в mwe), и изменяющаяся часть, которая не изменяется текущим вызовом, но будет изменена другими рабочими, или последующие вызовы (первые num_obj элементов в mwe).

Я создал два графика (усредненные по 5 прогонам), которые показывают, как время обработки масштабируется с количеством объектов и количеством рабочих.

Overhead introduced by unchanging parts of the object Overhead introduced by changing parts of the object

Как видите, многоядерная производительность становится хуже (!), Чем больше доступно ядер / рабочих, и линейно масштабируется при неизменных накладных расходах и экспоненциально при динамических c накладных расходах. Как это можно оптимизировать для лучшего масштабирования с увеличением количества ядер?

...