Как распараллелить итерацию по байтовому массиву? - PullRequest
0 голосов
/ 10 декабря 2018

Я пишу код для программы, которая будет работать с очень большими файлами с байтовыми данными (например, 4 ГБ для x = 2048, y = 2048, время = 1000 в приведенном ниже коде).В некоторых случаях это может быть до 16 ГБ файлов.Я думаю, что absolute_bytearray (data) можно ускорить как минимум в четыре раза с помощью многопроцессорной обработки (потому что при запуске программы загружается только около 28% ЦП):

КакМногопоточность Операция внутри цикла в Python

Как правильно применить многопроцессорную обработку для моего кода?

from time import perf_counter
from random import getrandbits

x = 512
y = 512
time = 200

xyt = x*y*time

my_by = bytearray(getrandbits(8) for x in range(xyt))

def absolute_bytearray(data):
    for i in range(len(data)):
        if data[i] > 127:
            data[i] = 255 - data[i]
    return data

start = perf_counter()
absolute_bytearray(my_by)
end = perf_counter()
print('time abs my_by = %.2f' % (end - start))  # around 6,70s for 512*512*200

Или, может быть, вы знаете более быстрое решение?

1 Ответ

0 голосов
/ 12 декабря 2018

Поскольку вы оперируете здесь данными большого объема, использование общей памяти было бы хорошим вариантом для сохранения минимального объема памяти при распараллеливании задания.Модуль multiprocessing ia предлагает Array для этого случая:

многопроцессорная обработка. Массив (typecode_or_type, size_or_initializer, *, lock = True)

Возвращает массив ctypes, выделенный из общей памяти.По умолчанию возвращаемое значение фактически является синхронизированной оболочкой для массива. документы

Приведенный ниже код также использует несколько процессов для создания данных.Пожалуйста, получите код для модуля mp_utils из моего ответа здесь .Эти две функции предназначены для создания «справедливых» диапазонов по индексам вашего общего массива.Эти batch_ranges отправляются рабочим процессам, и каждый процесс будет работать с общим массивом по индексам, содержащимся в этих диапазонах.

import random
import ctypes
from time import perf_counter
from multiprocessing import Process, Array

from mp_utils import calc_batch_sizes, build_batch_ranges


def f(data, batch_range):
    """Target processing function."""
    for i in batch_range:
        if data[i] > 127:
            data[i] = 255 - data[i]


def create_data(array, batch_range):
    """Fill specified range of array with random bytes."""
    rd = random.Random(42)  # arbitrary seed 42
    getrandbits = rd.getrandbits  # for speed
    for i in batch_range:
        array[i] = getrandbits(8)


def process_tasks(target, tasks):
    """Process tasks by starting a new process per task."""
    pool = [Process(target=target, args=task) for task in tasks]

    for p in pool:
        p.start()
    for p in pool:
        p.join()


def main(x, y, time, n_workers):

    xyt = x * y * time

    # creating data
    creation_start = perf_counter()  # ----------------------------------------
    # We don't need a lock here, because our processes operate on different
    # subsets of the array.
    sha = Array(ctypes.c_ubyte, xyt, lock=False)  # initialize zeroed array
    batch_ranges = build_batch_ranges(calc_batch_sizes(len(sha), n_workers))
    tasks = [*zip([sha] * n_workers, batch_ranges)]

    process_tasks(target=create_data, tasks=tasks)
    print(f'elapsed for creation: {perf_counter() - creation_start:.2f} s')  #-
    print(sha[:30])

    # process data
    start = perf_counter()  # -------------------------------------------------
    process_tasks(target=f, tasks=tasks)
    print(f'elapsed for processing: {perf_counter() - start:.2f} s')  # -------
    print(sha[:30])


if __name__ == '__main__':

    N_WORKERS = 8
    X = Y = 512
    TIME = 200

    main(X, Y, TIME, N_WORKERS)

Пример вывода:

elapsed for creation: 5.31 s
[163, 28, 6, 189, 70, 62, 57, 35, 188, 26, 173, 189, 228, 139, 22, 151, 108, 8, 7, 23, 55, 59, 129, 154, 6, 143, 50, 183, 166, 179]
elapsed for processing: 4.36 s
[92, 28, 6, 66, 70, 62, 57, 35, 67, 26, 82, 66, 27, 116, 22, 104, 108, 8, 7, 23, 55, 59, 126, 101, 6, 112, 50, 72, 89, 76]

Process finished with exit code 0

I'mзапуск этого на машине SandyBridge (2012), 8 ядер (4 Hyper-Threading), Ubuntu 18.04.

Ваш серийный исходный код получает:

elapsed for creation: 22.14 s
elapsed for processing: 16.78 s

Таким образом, я получаю примерно четырехкратное ускорение с моим кодом (примерно столько, сколько на моей машине реальноcores).

Эти цифры приведены для данных размером 50 МБ (512x512x200).Я также проверил с 4 ГиБ (2048x2048x1000), время улучшилось соответственно с 1500 с (последовательный) до 366 с (параллельный).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...