Должен ли я использовать сопрограммы или другой объект планирования здесь? - PullRequest
1 голос
/ 19 апреля 2011

В настоящее время у меня есть код в форме генератора, который вызывает задачу, связанную с вводом-выводом.Генератор на самом деле также вызывает суб-генераторы, так что было бы желательно более общее решение.

Что-то вроде следующего:

def processed_values(list_of_io_tasks):
    for task in list_of_io_tasks:
        value = slow_io_call(task)
        yield postprocess(value) # in real version, would iterate over 
                                 # processed_values2(value) here

У меня полный контроль над slow_io_call, и яМне все равно, в каком порядке я получу предметы от processed_values.Есть ли что-то вроде сопрограмм, которые я могу использовать, чтобы получить результат в самом быстром порядке, превратив slow_io_call в асинхронную функцию и используя тот вызов, который быстрее всего возвращает?Я ожидаю, что list_of_io_tasks будет длиной не менее тысячи записей.Я никогда не выполнял никакой параллельной работы, кроме явных потоков, и, в частности, я никогда не использовал различные доступные формы легких потоков.

Мне нужно использовать стандартную реализацию CPython, и я 'работает на Linux.

1 Ответ

2 голосов
/ 19 апреля 2011

Звучит так, будто вы ищете multiprocessing.Pool () , в частности, Pool.imap_unordered () метод.

Вот порт вашей функции для использования imap_unordered () для распараллеливания вызовов slow_io_call ().

 def processed_values(list_of_io_tasks):
     pool = multiprocessing.Pool(4) # num workers
     results = pool.imap_unordered(slow_io_call, list_of_io_tasks)
     while True:
         yield results.next(9999999) # large time-out

Обратите внимание, что вы также можете перебирать results напрямую (т. Е. for item in results: yield item) без цикла while True, однако вызов results.next() со значением времени ожидания работает вокруг этой ошибки обработки многопроцессорной клавиатуры и позволяет убить основной процесс и все подпроцессы с помощью Ctrl-C. Также обратите внимание, что исключения StopIteration не перехватываются в этой функции, но они будут сгенерированы, когда results.next() больше не будет возвращать элементы. Это допустимо для функций генератора, таких как эта, которые, как ожидается, будут либо вызывать ошибки StopItered, когда больше нет значений для выдачи, либо просто прекращают давать, и от его имени будет вызываться исключение StopIteration.

Чтобы использовать потоки вместо процессов, замените
import multiprocessing
с
import multiprocessing.dummy as multiprocessing

...