Куски данных из большого файла для многопроцессорной обработки? - PullRequest
16 голосов
/ 03 января 2012

Я пытаюсь распараллелить приложение, используя многопроцессорную обработку, которая принимает очень большой CSV-файл (от 64 до 500 МБ), выполняет некоторую работу построчно, а затем выводит небольшой файл фиксированного размера.

В настоящее время я делаю list(file_obj), который, к сожалению, полностью загружается в память (я думаю), и затем я разбиваю этот список на n частей, n - количество процессов, которые я хочу запустить.Затем я делаю pool.map() в разбитых списках.

Кажется, что это действительно очень плохое время выполнения по сравнению с однопоточной методологией "просто открыть файл и повторить по нему".Может кто-нибудь предложить лучшее решение?

Кроме того, мне нужно обработать строки файла в группах, которые сохраняют значение определенного столбца.Эти группы строк сами могут быть разделены, но ни одна группа не должна содержать более одного значения для этого столбца.

Ответы [ 2 ]

15 голосов
/ 03 января 2012

list(file_obj) может потребовать много памяти, если fileobj велико.Мы можем уменьшить эту потребность в памяти, используя itertools для извлечения фрагментов строк по мере необходимости.

В частности, мы можем использовать

reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)

, чтобы разбить файл на обрабатываемые куски, и

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)

, чтобы многопроцессорный пул работал на num_chunks кускахза один раз.

Для этого нам понадобится примерно только достаточно памяти, чтобы хранить в памяти несколько (num_chunks) кусков, а не весь файл.


import multiprocessing as mp
import itertools
import time
import csv

def worker(chunk):
    # `chunk` will be a list of CSV rows all with the same name column
    # replace this with your real computation
    # print(chunk)
    return len(chunk)  

def keyfunc(row):
    # `row` is one row of the CSV file.
    # replace this with the name column.
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'test.dat'
    num_chunks = 10
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()
    print(results)

if __name__ == '__main__':
    main()
2 голосов
/ 03 января 2012

Я бы сказал проще.Пусть одна программа откроет файл и прочитает его построчно.Вы можете выбрать, на сколько файлов разбить его, открыть столько выходных файлов и каждую строку записать в следующий файл.Это разделит файл на n равных частей.Затем вы можете запустить программу Python для каждого из файлов параллельно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...