Исключение, созданное в многопроцессорном пуле, не обнаружено - PullRequest
64 голосов
/ 18 июля 2011

Похоже, что при возникновении исключения из процесса многопроцессорной обработки пула не происходит трассировки стека или каких-либо других признаков того, что оно завершилось неудачно. Пример:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

печатает 1 и останавливается молча. Интересно, что повышение BaseException вместо этого работает. Есть ли способ сделать поведение для всех исключений таким же, как BaseException?

Ответы [ 9 ]

50 голосов
/ 03 января 2012

Может быть, я что-то упускаю, но разве это не то, что возвращает метод get объекта Result?См. Пулы процессов .

, класс multiprocessing.pool.AsyncResult

Класс результата, возвращаемого Pool.apply_async () и Pool.map_async ().get ([timeout])
Возвращает результат, когда он прибывает.Если для параметра timeout задано значение None, а результат не приходит в течение секунд, то вызывается multiprocessing.TimeoutError.Если удаленный вызов вызвал исключение, то это исключение будет повторно вызвано get ().

Итак, слегка изменив ваш пример, можно сделать

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

, что дает в результате

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

Это не совсем удовлетворительно, так как не печатает трассировку, но лучше, чем ничего.

ОБНОВЛЕНИЕ: Эта ошибка была исправлена ​​в Python 3.4, любезно предоставленной Ричардом Оудкерком,См. Проблему get метод multiprocessing.pool.Async должен возвращать полную трассировку .

28 голосов
/ 06 октября 2011

У меня есть разумное решение проблемы, по крайней мере, для целей отладки.В настоящее время у меня нет решения, которое возвратило бы исключение в основные процессы.Моей первой мыслью было использование декоратора, но вы можете выбрать только функции, определенные на верхнем уровне модуля , так что все правильно.

Вместо этого - простой класс переноса и пулподкласс, который использует это для apply_async (и, следовательно, apply).Я оставлю map_async в качестве упражнения для читателя.

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

Это дает мне:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception
20 голосов
/ 22 февраля 2015

Решение с большинством голосов на момент написания имеет проблему:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

Как отметил @dfrankow, он будет ожидать x.get(), что разрушает точку асинхронного выполнения задачи. Таким образом, для повышения эффективности (в частности, если ваша рабочая функция go занимает много времени), я бы изменил ее на:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

Преимущества : рабочая функция запускается асинхронно, поэтому, если, например, вы выполняете много задач на нескольких ядрах, она будет намного более эффективной, чем исходное решение.

Недостатки : если в рабочей функции есть исключение, оно будет вызвано только после , когда пул завершил все задачи. Это может или не может быть желательным поведением. РЕДАКТИРОВАНИЕ согласно комментарию @ colinfang, который исправил это.

9 голосов
/ 19 августа 2014

Я успешно записал исключения с этим декоратором:

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func

с кодом в вопросе, это

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()

Просто украсьте функцию, которую вы передаете в пул процессов. Ключом к этой работе является @functools.wraps(func), иначе многопроцессорная обработка выдает PicklingError.

код выше дает

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception
2 голосов
/ 26 сентября 2017
import logging
from multiprocessing import Pool

def proc_wrapper(func, *args, **kwargs):
    """Print exception because multiprocessing lib doesn't return them right."""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logging.exception(e)
        raise

def go(x):
    print x
    raise Exception("foobar")

p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()
1 голос
/ 12 июня 2013

Я создал модуль RemoteException.py , который показывает полную трассировку исключения в процессе. Python2. Загрузите его и добавьте в свой код:

import RemoteException

@RemoteException.showError
def go():
    raise Exception('Error!')

if __name__ == '__main__':
    import multiprocessing
    p = multiprocessing.Pool(processes = 1)
    r = p.apply(go) # full traceback is shown here
0 голосов
/ 04 ноября 2017

Поскольку уже есть приличные ответы для multiprocessing.Pool, я предоставлю решение с использованием другого подхода для полноты.

Для python >= 3.2 следующее решение кажется самым простым:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]

Преимущества:

  • очень маленький код
  • вызывает исключение в основном процессе
  • обеспечивает трассировку стека
  • нет внешних зависимостей

Для получения дополнительной информации об API, пожалуйста, проверьте: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

Кроме того, если вы отправляете большое количество задач и хотите, чтобы ваш основной процесс завершился сбоем сразу после сбоя одной из ваших задач, вы можете использовать следующий фрагмент:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]

Все остальные ответы терпят неудачу только после того, как все задачи выполнены.

0 голосов
/ 15 марта 2017

Так как вы использовали apply_sync, я полагаю, что сценарий использования состоит в том, чтобы выполнить некоторые задачи синхронизации. Использовать обратный вызов для обработки другой вариант. Обратите внимание, что эта опция доступна только для python3.2 и выше и недоступна для python2.7.

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()
0 голосов
/ 18 июля 2011

Я бы попробовал использовать pdb:

import pdb
import sys
def handler(type, value, tb):
  pdb.pm()
sys.excepthook = handler
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...