Оптимизируйте python программу для одновременного анализа двух больших файлов - PullRequest
1 голос
/ 06 марта 2020

Я пытаюсь проанализировать два больших файла с Python3 одновременно. Как показано здесь:

dict = {}
row = {}
with open(file1, "r") as f1, open(file2, "r") as f2:
  zipped = zip(f1, f2)
  for line_f1, line_f2 in zipped:
    # parse the lines and save the line information in a dictionary 
    row = {"ID_1":line_f1[0], "ID_2":line_f2[0], ...}

    # This process takes roughly 0.0005s each time
    # it parses each pair of lines at once and returns an output
    # it doesn't depend on previous lines or lines after
    output = process(row) 

    # output is a string, add it to dict
    if output in dict:
       dict[output] += 1
    else:
       dict[output] = 1
return dict

Когда я тестировал приведенный выше код с двумя текстовыми файлами меньшего размера (30 000 строк каждый, размер файла = 13M), и это занимает примерно 150 с до конечного значения sh l oop.

Когда я тестировал два больших текстовых файла (по 9 000 000 строк каждый, размер файла = 3,8 ГБ) без шага процесса в l oop, это занимает примерно 670 с.

Когда я тестировал те же два больших текстовых файла с шагом процесса. Я рассчитал, что для каждых 10000 предметов это займет примерно 60 с. Время не увеличивалось, когда количество итераций становилось большим.

Однако, когда я отправляю это задание в общий кластер, для обработки одной пары больших файлов требуется более 36 часов для завершения sh обработки. Я пытаюсь выяснить, есть ли другой способ обработки файлов, чтобы он мог быть быстрее. Мы ценим любые предложения.

Заранее спасибо!

1 Ответ

0 голосов
/ 06 марта 2020

Это всего лишь гипотеза, но ваш процесс может тратить свое выделенное место на ЦП каждый раз, когда запускает ввод / вывод для получения пары строк. Вы можете попробовать читать группы строк одновременно и обрабатывать их по частям, чтобы вы могли максимально использовать каждый временной интервал ЦП, который вы получаете в общем кластере.

from collections import deque
chunkSize = 1000000 # number of characters in each chunk (you will need to adjust this)
chunk1    = deque([""]) #buffered lines from 1st file
chunk2    = deque([""]) #buffered lines from 2nd file
with open(file1, "r") as f1, open(file2, "r") as f2:
    while chunk1 and chunk2:
        line_f1 = chunk1.popleft()
        if not chunk1:
            line_f1,*more = (line_f1+file1.read(chunkSize)).split("\n")
            chunk1.extend(more)
        line_f2 = chunk2.popleft()
        if not chunk2:
            line_f2,*more = (line_f2+file2.read(chunkSize)).split("\n")
            chunk2.extend(more)
        # process line_f1, line_f2
        ....

То, как это работает, заключается в чтении фрагмента символов (который должен быть больше вашей самой длинной строки) и разбиении его на строки. Строки помещаются в очередь для обработки.

Поскольку размер фрагмента выражается в количестве символов, последняя строка в очереди может быть неполной.

Чтобы убедиться, что строки завершены перед обработкой, другой блок считывается, когда мы добираемся до последней строки в очереди. Дополнительные символы добавляются в конец незавершенной строки, и в объединенной строке выполняется разбиение строки. Поскольку мы конкатенировали последнюю (неполную) строку, функция .split("\n") всегда применяется к фрагменту текста, который начинается на границе строки.

Процесс продолжается с (теперь завершенной) последней строки и остальных строки добавляются в очередь.

...