Поскольку вы оперируете здесь данными большого объема, использование общей памяти было бы хорошим вариантом для сохранения минимального объема памяти при распараллеливании задания.Модуль multiprocessing
ia предлагает Array
для этого случая:
многопроцессорная обработка. Массив (typecode_or_type, size_or_initializer, *, lock = True)
Возвращает массив ctypes, выделенный из общей памяти.По умолчанию возвращаемое значение фактически является синхронизированной оболочкой для массива. документы
Приведенный ниже код также использует несколько процессов для создания данных.Пожалуйста, получите код для модуля mp_utils
из моего ответа здесь .Эти две функции предназначены для создания «справедливых» диапазонов по индексам вашего общего массива.Эти batch_ranges
отправляются рабочим процессам, и каждый процесс будет работать с общим массивом по индексам, содержащимся в этих диапазонах.
import random
import ctypes
from time import perf_counter
from multiprocessing import Process, Array
from mp_utils import calc_batch_sizes, build_batch_ranges
def f(data, batch_range):
"""Target processing function."""
for i in batch_range:
if data[i] > 127:
data[i] = 255 - data[i]
def create_data(array, batch_range):
"""Fill specified range of array with random bytes."""
rd = random.Random(42) # arbitrary seed 42
getrandbits = rd.getrandbits # for speed
for i in batch_range:
array[i] = getrandbits(8)
def process_tasks(target, tasks):
"""Process tasks by starting a new process per task."""
pool = [Process(target=target, args=task) for task in tasks]
for p in pool:
p.start()
for p in pool:
p.join()
def main(x, y, time, n_workers):
xyt = x * y * time
# creating data
creation_start = perf_counter() # ----------------------------------------
# We don't need a lock here, because our processes operate on different
# subsets of the array.
sha = Array(ctypes.c_ubyte, xyt, lock=False) # initialize zeroed array
batch_ranges = build_batch_ranges(calc_batch_sizes(len(sha), n_workers))
tasks = [*zip([sha] * n_workers, batch_ranges)]
process_tasks(target=create_data, tasks=tasks)
print(f'elapsed for creation: {perf_counter() - creation_start:.2f} s') #-
print(sha[:30])
# process data
start = perf_counter() # -------------------------------------------------
process_tasks(target=f, tasks=tasks)
print(f'elapsed for processing: {perf_counter() - start:.2f} s') # -------
print(sha[:30])
if __name__ == '__main__':
N_WORKERS = 8
X = Y = 512
TIME = 200
main(X, Y, TIME, N_WORKERS)
Пример вывода:
elapsed for creation: 5.31 s
[163, 28, 6, 189, 70, 62, 57, 35, 188, 26, 173, 189, 228, 139, 22, 151, 108, 8, 7, 23, 55, 59, 129, 154, 6, 143, 50, 183, 166, 179]
elapsed for processing: 4.36 s
[92, 28, 6, 66, 70, 62, 57, 35, 67, 26, 82, 66, 27, 116, 22, 104, 108, 8, 7, 23, 55, 59, 126, 101, 6, 112, 50, 72, 89, 76]
Process finished with exit code 0
I'mзапуск этого на машине SandyBridge (2012), 8 ядер (4 Hyper-Threading), Ubuntu 18.04.
Ваш серийный исходный код получает:
elapsed for creation: 22.14 s
elapsed for processing: 16.78 s
Таким образом, я получаю примерно четырехкратное ускорение с моим кодом (примерно столько, сколько на моей машине реальноcores).
Эти цифры приведены для данных размером 50 МБ (512x512x200).Я также проверил с 4 ГиБ (2048x2048x1000), время улучшилось соответственно с 1500 с (последовательный) до 366 с (параллельный).