У меня есть довольно распространенный сценарий производителя / потребителя, с одним поворотом.
Мне нужно прочитать строки текста из входного потока объемом в несколько гигабайт (который может быть файлом или потоком HTTP);обрабатывать каждую строку медленным и ресурсоемким алгоритмом, который будет выводить строку текста для каждой строки ввода;затем запишите выходные строки в другой поток. Суть в том, что мне нужно записать выходные строки в том же порядке, что и входные строки, которые их породили.
Обычный подход к этим сценариям - использовать многопроцессорную работу. с очередью, подающей строки (на самом деле, пакетами строк) из процесса чтения, и другой очередью, ведущей из пула в процесс записи:
/ [Pool] \
[Reader] --> InQueue --[Pool]---> OutQueue --> [Writer]
\ [Pool] /
Но как я могу убедиться, что выходные строки(или пакеты) сортируются в правильном порядке?
Один простой ответ: «просто записать их во временный файл, затем отсортировать файл и записать его в вывод». Я мог бы в конечном итоге сделать это, но я действительно хотел бы начать потоковую передачу выходных строк как можно скорее - в отличие от ожидания обработки всего входного потока от начала до конца.
Я мог бы легконаписать свою собственную реализацию multiprocessing.Queue, которая будет сортировать свои элементы внутренне, используя словарь (или список с циклическим буфером), блокировку и два условия (плюс, возможно, целочисленный счетчик). Однако мне нужно было бы получить все эти объекты из диспетчера, и я боюсь, что использование такого общего состояния между несколькими процессами снизит мою производительность. Итак, есть ли подходящие пути Pythony для решения этой проблемы?