Как прервать методы multiprocessing.Pool () и получить частичные результаты? - PullRequest
0 голосов
/ 13 марта 2019

Допустим, у меня есть дорогая пара функция / итерация, которую я хотел бы распараллелить с multiprocessing.Pool() * .map() или .map_async() методами класса:

import time
from multiprocessing import Pool

def my_expensive_function(x):
    time.sleep(x)
    return x*x

list_of_nums = list(range(0,100))

with Pool() as p:
    nums_sq = p.map_async(func = my_expensive_function, 
                          iterable = list_of_nums).get()

print(nums_sq)

Несмотря на то, что наш код распараллелен с .map_async() и намного быстрее, чем если бы мы просто использовали цикл for, я хотел бы иметь возможность прерывать .map_async(), чтобы преждевременно завершить дочерние процессы и получить частичные результаты.

Наивный блок-триггер не работает, без дела:

import time
from multiprocessing import Pool

# Now with try/except:

def my_expensive_function(x):
    time.sleep(x)
    return x*x

list_of_nums = list(range(0,100))

with Pool() as p:
    try:
        nums_sq = p.map_async(func = my_expensive_function, 
                               iterable = list_of_nums).get()
    except KeyboardInterrupt:
        pass

# If interrupted before .map_async() has completed...

print(nums_sq) # NameError: name 'nums_sq2' is not defined

(Стоит отметить, что использование аналогичного блока try-exc в цикле for будет работать, если цикл for повторяется хотя бы один раз до прерывания клавиатуры.)

Итак, какие модификации нужно было бы сделать здесь, чтобы определить nums_sq и указать на объект с частичными результатами вызова .map_async()?

...