Реализация отсортированной очереди производителя / потребителя с помощью Multiprocessing - PullRequest
1 голос
/ 02 октября 2019

У меня есть довольно распространенный сценарий производителя / потребителя, с одним поворотом.

Мне нужно прочитать строки текста из входного потока объемом в несколько гигабайт (который может быть файлом или потоком HTTP);обрабатывать каждую строку медленным и ресурсоемким алгоритмом, который будет выводить строку текста для каждой строки ввода;затем запишите выходные строки в другой поток. Суть в том, что мне нужно записать выходные строки в том же порядке, что и входные строки, которые их породили.

Обычный подход к этим сценариям - использовать многопроцессорную работу. с очередью, подающей строки (на самом деле, пакетами строк) из процесса чтения, и другой очередью, ведущей из пула в процесс записи:

                       / [Pool] \    
  [Reader] --> InQueue --[Pool]---> OutQueue --> [Writer]
                       \ [Pool] /

Но как я могу убедиться, что выходные строки(или пакеты) сортируются в правильном порядке?

Один простой ответ: «просто записать их во временный файл, затем отсортировать файл и записать его в вывод». Я мог бы в конечном итоге сделать это, но я действительно хотел бы начать потоковую передачу выходных строк как можно скорее - в отличие от ожидания обработки всего входного потока от начала до конца.

Я мог бы легконаписать свою собственную реализацию multiprocessing.Queue, которая будет сортировать свои элементы внутренне, используя словарь (или список с циклическим буфером), блокировку и два условия (плюс, возможно, целочисленный счетчик). Однако мне нужно было бы получить все эти объекты из диспетчера, и я боюсь, что использование такого общего состояния между несколькими процессами снизит мою производительность. Итак, есть ли подходящие пути Pythony для решения этой проблемы?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...