Правильный способ выхода из функции, вызываемой мультипроцессором. - PullRequest
0 голосов
/ 09 октября 2018

Как выйти из функции под названием my multiprocessing.Pool

Вот пример кода, который я использую, когда я ставлю условие для выхода из функции worker, когда я использую это как скрипт в терминалеостанавливается и не выходит.

def worker(n):
    if n == 4:
        exit("wrong number")  # tried to use sys.exit(1) did not work
    return n*2

def caller(mylist, n=1):
    n_cores = n if n > 1 else multiprocessing.cpu_count()
    print(n_cores)
    pool = multiprocessing.Pool(processes=n_cores)
    result = pool.map(worker, mylist)
    pool.close()
    pool.join()
    return result

l = [2, 3, 60, 4]
myresult = caller(l, 4)

Ответы [ 2 ]

0 голосов
/ 10 октября 2018

С pool.map дело в том, что он вызовет исключения из дочерних процессов только после . Все задачи завершены.Но ваши комментарии звучат так, будто вам нужно немедленно прервать всю обработку, как только в каком-либо процессе будет обнаружено неправильное значение.Тогда это будет задание для pool.apply_async.

pool.apply_async предложений error_callbacks, которые можно использовать для прекращения пула.Рабочих будут кормить поэтапно, а не по частям, как в вариантах pool.map, поэтому у вас есть шанс досрочного выхода из каждого обработанного аргумента.

Я в основном повторяю свой ответ из здесь :

from time import sleep
from multiprocessing import Pool

def f(x):
    sleep(x)
    print(f"f({x})")
    if x == 4:
        raise ValueError(f'wrong number: {x}')
    return x * 2

def on_error(e):
    if type(e) is ValueError:
        global terminated
        terminated = True
        pool.terminate()
        print(f"oops: {type(e).__name__}('{e}')")


def main():
    global pool
    global terminated

    terminated = False

    pool = Pool(4)
    results = [pool.apply_async(f, (x,), error_callback=on_error)
               for x in range(10)]
    pool.close()
    pool.join()

    if not terminated:
        for r in results:
            print(r.get())


if __name__ == '__main__':
    main()

Вывод:

f(0)
f(1)
f(2)
f(3)
f(4)
oops: ValueError('wrong number: 4')

Process finished with exit code 0
0 голосов
/ 09 октября 2018

Как я уже сказал, я не думаю, что вы можете выйти из задачи, выполняющей основной сценарий, из рабочего процесса.

Вы точно не объяснили , почему вы хотите сделатьэто, так что этот ответ является предположением, но, возможно, поднятие пользовательского Exception и обработка его в явном выражении except, как показано ниже, будет приемлемым способом обойти ограничение.

import multiprocessing
import sys

class WorkerStopException(Exception):
    pass

def worker(n):
    if n == 4:
        raise WorkerStopException()
    return n*2

def caller(mylist, n=1):
    n_cores = n if n > 1 else multiprocessing.cpu_count()
    print(n_cores)
    pool = multiprocessing.Pool(processes=n_cores)
    try:
        result = pool.map(worker, mylist)
    except WorkerStopException:
        sys.exit("wrong number")
    pool.close()
    pool.join()
    return result

if __name__ == '__main__':
    l = [2, 3, 60, 4]
    myresult = caller(l, 4)

Выводится выводпри запуске:

4
wrong number

(4 - это количество процессоров в моей системе.)

...