Многопоточность при чтении и обработке огромного файла (слишком большого для памяти) - PullRequest
0 голосов
/ 05 февраля 2019

У меня есть следующий код, который работает очень медленно.Это программа, которая разбивает большой файл (80 гигабайт) и помещает его в древовидную структуру папок для быстрого поиска.Я сделал несколько комментариев в коде, чтобы помочь вам понять его.

# Libraries
import os


# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth


# Preperations
os.makedirs(outputdirectory)

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()


# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
    for line in infile:
        pipeline(line)

Есть ли способ заставить многопоточность работать?Поскольку я сам попробовал несколько примеров, я нашел его в Интернете, и он поместил все в память, заставляя мой компьютер несколько раз зависать.

1 Ответ

0 голосов
/ 05 февраля 2019

Во-первых, самое простое решение (IMO)

Если, как кажется, строки полностью независимы, просто разбейте ваш файл на N фрагментов, передайте имя файла, чтобы открыть его в качестве аргумента программы, и запустите несколько экземпляров.вашего текущего скрипта, запускающего их вручную в нескольких командных строках.

Плюсы:

  • Нет необходимости вникать в многопроцессорность, межпроцессное взаимодействие и т. д.
  • дел.не нужно слишком сильно изменять код

Минусы:

  • Вам необходимо предварительно обработать большой файл, разбив его на куски (хотя это будет намного быстрее, чем ваш текущий)время выполнения, поскольку у вас не будет сценария открытия-закрытия-на-строке)
  • Вам необходимо запустить процессы самостоятельно, передав соответствующее имя файла для каждого из них

Это будет реализовано следующим образом:

Предварительная обработка:

APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
  chunk_id = 0
  next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
  while next_chunk:
    with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
      ofp.writelines(next_chunk)
    chunk_id += 1
    next_chunk = fp.readlines(APPROX_CHUNK_SIZE)

Из readlines документов :

Если необязательный аргумент sizehintприсутствует вместо чтениявплоть до EOF считываются целые строки с общим размером байтов sizehint (возможно, после округления до внутреннего размера буфера).

Выполнение этого способа не обеспечит четное количество строк во всех чанках,но сделает предварительную обработку намного быстрее, так как вы читаете по блокам, а не построчно.Адаптируйте размер куска по мере необходимости.Также обратите внимание, что с помощью readlines мы можем быть уверены, что у нас не будет разрывов строк между кусками, но поскольку функция возвращает список строк, мы используем writelines, чтобы записать это в наш выходной файл (что эквивалентнозациклите список и ofp.write(line)).Для полноты позвольте мне отметить, что вы также можете объединить все строки в памяти и вызвать write только один раз (т. Е. Сделать ofp.write(''.join(next_chunk))), что может принести вам (незначительное) повышение производительности, выплачиваемое (много) более интенсивное использование ОЗУ.

Основной сценарий:

Единственные модификации, которые вам нужны, находятся в самом верху:

import sys
file=sys.argv[1]
... # rest of your script here

При использовании argv вы можете передать командуСтрока аргументов вашей программы (в данном случае файл, который нужно открыть).Затем просто запустите ваш скрипт как:

python process_the_file.py big_file_0.txt

Это запустит один процесс.Откройте несколько терминалов и выполните одну команду с big_file_N.txt для каждого, и они будут независимы друг от друга.

Примечание : я использую argv[1], потому что для всех программ первое значениеargv (т. е. argv[0]) - это всегда имя программы.


Затем решение multiprocessing

Хотя эффективное, первое решение не совсем элегантно,тем более что у вас будет 80 файлов, если вы начнете с файла размером 80 ГБ.

Более чистое решение - использовать модуль multiprocessing python (важно: НЕ threading! Если вы этого не сделаетеузнайте разницу, найдите «глобальную блокировку интерпретатора» и узнайте, почему многопоточность в python не работает так, как вы думаете).

Идея состоит в том, чтобы иметь один процесс «продюсера», который открывает большой файл инепрерывно ставит строки из него в очередь.Затем пул «потребительских» процессов, которые извлекают из очереди строки и выполняют обработку.

Плюсы:

  • Один скрипт делает все
  • Нет необходимостичтобы открыть несколько терминалов и набрать

Минусы:

  • Сложность
  • использует межпроцессное взаимодействие, которое имеет некоторые издержки

Это будет реализовано следующим образом:

# Libraries
import os
import multiprocessing

outputdirectory="sorted"
depth=4 # This is the tree depth

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()

if __name__ == '__main__':
    # Variables
    file="80_gig_file.txt"

    # Preperations
    os.makedirs(outputdirectory)
    pool = multiprocessing.Pool() # by default, 1 process per CPU
    LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM

    with open(file) as infile:
        next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
        pool.close()
        pool.join()

Строка if __name__ == '__main__' является барьером для отделения кода, который выполняется в каждом процессе, от кода, который выполняется только в «отце».Каждый процесс определяет pipeline, но только отец фактически порождает пул рабочих и применяет эту функцию.Вы найдете более подробную информацию о multiprocessing.map здесь

Редактировать:

Добавлено закрытие и присоединение к пулу, чтобы предотвратить выход из основного процесса и уничтожение дочерних процессов..

...