Как эффективно запускать параллельные процессы в python, при этом каждый должен читать из большого файла - PullRequest
0 голосов
/ 05 мая 2020

У меня есть большой файл protobuf с разделителями (может быть от 1 до 30 ГБ). Каждое сообщение в файле имеет определенный формат, где первый атрибут - это строка, а второй - повторяющийся (наподобие списка) объект, содержащий 2 атрибута.

Это похоже на это текстовое представление

БОЛЬШОЙ ФАЙЛ:

first 10:32, 12:1, ... ,100002:3
second 1:3, 15:5, ... ,548756:57
...
...
ten_million 4:7, 48:4, ... ,12357458:8

В настоящее время мой код выглядит примерно так:

import itertools
from multiprocessing import Pool
from google.protobuf.internal.decoder import _DecodeVarint32
import proto_pb2


def read_message(buffer, n, message_type):
    message = message_type()
    msg_len, new_pos = _DecodeVarint32(buffer, n)
    n = new_pos
    msg_buf = buffer[n:n + msg_len]
    n += msg_len
    message.ParseFromString(msg_buf)
    return message


class A:
    def __init__(self, big_file):
        with open(big_file, 'rb') as fp:
            self.buf = fp.read()

    def get_line(self, n):
        return read_message(self.buf, n, proto_pb2.line_type)


def func(obj_a, lines):
    res = []
    for line in lines:
        res.append(obj_a.get_line(line))
    return res


if __name__ == '__main__':
    all_lines = [[54487, 78, 10025, 548967], [12, 3218], [45786, 5744, 567, 45648], [45156, 456, 75]]
    a = A(big_file)
    with Pool() as pool:
        result = pool.starmap(func, itertools.product([a], all_lines))
    print(result)

Я открываю и читаю файл внутри класса и удерживаю его после создания объекта класса, чтобы избежать множественные открытия / закрытия файла. Он умещается в памяти, но я бы хотел этого избежать.

Затем я ожидаю, что каждый из процессов, созданных пулом, прочитает нужные ему строки из файла и вернет желаемый результат. Все подпроцессы будут только читать из файла (без записи) и печатать / возвращать результаты. В настоящее время он действительно не работает параллельно, мне кажется, что он долго ожидает получения блокировки, прежде чем каждый процесс сможет запуститься. Я предполагаю, что это происходит из-за большого размера файла, который копируется для каждого процесса.

Какая будет правильная реализация?

Этот код используется только для Например, фактический файл - это файл protobuf (надеюсь, я не сделал там много ошибок), и я сохраняю отображение (dict) номеров строк и их местоположения в файле

...