Как я могу распараллелить конвейер генераторов / итераторов в Python? - PullRequest
7 голосов
/ 16 апреля 2011

Предположим, у меня есть некоторый код Python, подобный следующему:

input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

Этот код читает каждую строку из входного файла, пропускает ее через несколько функций и записывает вывод в выходной файл. Теперь I знает, что функции process_line, process_item и generate_output_line никогда не будут мешать друг другу, и давайте предположим, что входные и выходные файлы находятся на отдельных дисках, так что чтение и запись не будут мешать друг другу.

Но, вероятно, Python ничего об этом не знает. Насколько я понимаю, Python будет читать одну строку, применять каждую функцию по очереди и записывать результат в вывод, а затем он будет читать вторую строку только после отправки первой строки на вывод, так что вторая строка не входит в конвейер, пока не выйдет первая. Я правильно понимаю, как эта программа будет течь? Если это так, то есть ли простой способ сделать так, чтобы несколько конвейеров могли быть в конвейере одновременно, чтобы программа считывала, записывала и обрабатывала каждый шаг параллельно?

1 Ответ

5 голосов
/ 16 апреля 2011

Вы не можете распараллелить чтение или запись в файлы;в конечном итоге это будет вашим узким местом.Вы уверены , что узким местом здесь является процессор, а не ввод / вывод?

Поскольку ваша обработка не содержит зависимостей (по вашему мнению), тривиально просто использовать многопроцессорность Python.Класс пула .

Есть несколько способов написать это, но проще отладить поиск независимых критических путей (самая медленная часть кода), которые мы сделаем параллельными.Давайте предположим, что это process_item.

… И это все, на самом деле.Код:

import multiprocessing.Pool

p = multiprocessing.Pool() # use all available CPUs

input = open("input.txt")
x = (process_line(line) for line in input)
y = p.imap(process_item, x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

Я не проверял, но это основная идея.Метод imap пула гарантирует, что результаты будут возвращены в правильном порядке.

...