Как распараллелить итерацию по большому файлу байтового типа? - PullRequest
0 голосов
/ 28 января 2019

Что у меня есть: файл байтов до 16 Гбайт со смещением (например, 100 байт).Что мне нужно: обработать действие "f" в коде самым быстрым способом, например, я надеюсь с многопроцессорной обработкой.

Я пытался реализовать http://effbot.org/zone/wide-finder.htm этот подход.Многопоточное решение Python из этой статьи было в два раза медленнее, чем оригинальный код.Многопроцессорное решение Python, которое я не смог реализовать, потому что мой уровень Python недостаточно хорош.Я прочитал описание модуля многопроцессорной обработки, но это не помогло мне, у меня возникли некоторые проблемы с кодом ...

from time import perf_counter
from random import getrandbits


def create_byte_data(size):
    creation_start = perf_counter()
    my_by = bytes(getrandbits(8) for i in range(size))  # creates 50MB random byte data
    print('creation my_by time = %.1f' % (perf_counter() - creation_start))
    return my_by


def write_to_file(file, data, b):
    writing_start = perf_counter()
    with open(file, "wb") as f:  # binary file creation
        for a in range(offset):
            f.write(b'0')
        # for n in range(b):  # for creating bigger files
        #     f.write(data)
        f.write(data)
    print('writing time = %.1f' % (perf_counter() - writing_start))


def abs_pixel(pixel: bytes) -> int:  # converting signed bytes to absolute (0 +127) values, and collection sum of them to "result"
    result = 0
    for a in pixel:
        if a > 127:
            result += 256 - a
        else:
            result += a
    return result    


def f(file, offset, time):  # this function must be accelerated
    sum_list = list()
    with open(file, "rb") as f:
        f.seek(offset)
        while True:
            chunk = f.read(time)
            if not chunk:
                break
            sum_list.append(abs_pixel(chunk))
    return sum_list


if __name__ == '__main__':
    filename = 'bytes.file'
    offset = 100
    x = 512
    y = 512
    time = 200
    fs = 2  # file size in GBytes  # for creating bigger files
    xyt = x * y * time
    b = fs*1024*1024*1024//xyt  # parameter for writing data file of size 'fs'
    my_data = create_byte_data(xyt)  # don't needed after created ones
    write_to_file(filename, my_data, b)  # don't needed after created ones
    start = perf_counter()
    result = f(filename, offset, time)  # this function must be accelerated
    print('function time = %.1f' % (perf_counter() - start))
    print(result[:10])

Задача: выполнить некоторые математические операции с чанком (который имеет длину "время") исобирать результаты в список.Файлы могут быть очень большими, поэтому ОЗУ не должно быть перегружено.Приведенный выше код может создать случайный файл байтов (50 Мб для начала или больше для дальнейших тестов).Я ожидаю по крайней мере 4-кратное ускорение для запуска функции "F" по сравнению с кодом выше.На самом деле на моем компьютере требуется около 6 секунд для файла размером 50 МБ и около 240 секунд для файла размером 2 ГБ.

1 Ответ

0 голосов
/ 22 февраля 2019

Я нашел, как распараллелить некоторую часть кода с помощью многопроцессорной обработки, и как заставить abs_pixel работать быстрее.Теперь код работает в 2 раза быстрее (например, 6,1 с против 11,9 с при 100 МБ на моем ПК или 119 с 246 с при 2 ГБ).

from multiprocessing import Pool
from struct import unpack_from


def abs_pixel_2(pixel: bytes) -> int:  # integral abs
    a = unpack_from('<%ib' % len(pixel), pixel)
    return sum(map(abs, a))


def f_mp(file, offset, time):  # read in lines
    sum_list = list()
    p = Pool()
    with open(file, "rb") as f:
        f.seek(offset)
        for z in range(y*2):  # (y*2) for 100 MByte file (2 times original 50 MBytes created data)
            line = list()
            for i in range(x):  # read a line
                pixel = f.read(time)
                line.append(pixel)
            sums_line = p.map(abs_pixel_2, line)  # line with sums
            # sums_line = list(map(abs_pixel, line))  # line with sums without using of Pool
            sum_list.append(sums_line)  # -> list of lines (lists)
        sum_list = [item for sublist in sum_list for item in sublist]  # flatten of list
        p.close()
        p.join()
    return sum_list

Но я все еще надеюсь найти больше точек ускорения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...