У меня есть программа, которая сильно ограничена процессором и (теоретически) распараллеливается. Задача состоит в том, чтобы вызвать метод (small_workload
) объекта, изменяющий состояние объекта, и сделать это для пакета / списка таких объектов. Суть в том, что рабочая нагрузка на вызов относительно мала ; однако состояние объекта относительно велико . Это приводит к нежелательным накладным расходам в IP C и делает многоядерную реализацию значительно (!) Медленнее по сравнению с одноядерной реализацией.
Так что мой вопрос кажется совершенно невинным: как вы go об уменьшении этого IP C?
Вот минимально рабочий пример, который воспроизводит проблему:
import multiprocessing as mp
import time
class Element(object):
def __init__(self, num_items):
self.large_attribute = [0] * num_items
def small_workload(self, pos):
self.large_attribute[pos] += 1
def init_helper(obj, obj_size):
return obj(obj_size)
def work_helper(obj, pos):
obj.small_workload(pos)
return obj
def test_obj_list(obj_list, num_iter):
# test that order has not been scrambled
for obj_idx, obj in enumerate(obj_list):
for attr_idx, attribute in enumerate(obj.large_attribute):
if obj_idx == attr_idx:
assert attribute == num_iter
else:
assert attribute == 0
def execute_work(num_obj=1, num_workers=0, num_extra_payload=0, num_iter=200):
obj_size = num_obj + num_extra_payload
if num_workers == 0:
# create the objects
obj_list = [obj for obj in map(init_helper,
[Element] * num_obj,
[obj_size] * num_obj)]
# process objects
for _ in range(num_iter):
obj_idx = range(num_obj) # helper for neat code
obj_list = [obj for obj in map(work_helper, obj_list, obj_idx)]
# assume some more work here that prevents the for from being
# parallelized
else:
with mp.Pool(processes=num_workers) as workers:
# create the objects
obj_list = workers.starmap(init_helper, zip([Element] * num_obj,
[obj_size] * num_obj))
for _ in range(num_iter):
obj_idx = range(num_obj) # helper for neat code
obj_list = workers.starmap(work_helper, zip(obj_list, obj_idx))
# assume some more work here that prevents the for from being
# parallelized
return obj_list
if __name__ == "__main__":
start_time = time.time()
obj_list = execute_work(1000, 0)
total_time = time.time() - start_time
test_obj_list(obj_list, 200)
print("Single Worker Time: " + str(total_time))
start_time = time.time()
obj_list = execute_work(1000, num_workers=3)
total_time = time.time() - start_time
test_obj_list(obj_list, 200)
print("3 Workers Time: " + str(total_time))
И вот мои тайминги (Win10, Intel Xeon E3-1230 @ 3.30GHz ):
Single Worker Time: 0.06699895858764648
3 Workers Time: 11.602166652679443
Обратите внимание, что я не контролирую внутреннюю структуру объекта ; Я знаю только, что у него есть метод small_workload
, который он наследует от родителя; однако неизвестно, как этот метод выполняет работу. Однако результирующая рабочая нагрузка всегда относительно невелика.
Я могу концептуально разделить состояние объекта на две части. Неизменяемая часть, на которую никогда не влияет small_workload
(может быть изменена с помощью num_extra_payload
в mwe), и изменяющаяся часть, которая не изменяется текущим вызовом, но будет изменена другими рабочими, или последующие вызовы (первые num_obj
элементов в mwe).
Я создал два графика (усредненные по 5 прогонам), которые показывают, как время обработки масштабируется с количеством объектов и количеством рабочих.
Как видите, многоядерная производительность становится хуже (!), Чем больше доступно ядер / рабочих, и линейно масштабируется при неизменных накладных расходах и экспоненциально при динамических c накладных расходах. Как это можно оптимизировать для лучшего масштабирования с увеличением количества ядер?