Python: получение трассировки от многопроцессорной обработки. Процесс - PullRequest
33 голосов
/ 25 мая 2011

Я пытаюсь получить объект трассировки от многопроцессорной обработки. Процесс.К сожалению, передача информации об исключении через канал не работает, поскольку объекты трассировки не могут быть обработаны:

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        pipe_to_parent.send(sys.exc_info())

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print traceback.format_exception(*exc_info)
to_child.close()
to_self.close()

Трассировка:

Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
    self.run()
  File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "foo", line 7, in foo
    to_parent.send(sys.exc_info())
PicklingError: Can't pickle <type 'traceback'>: attribute lookup __builtin__.traceback failed

Есть ли другой способ доступа к информации об исключении?Я хотел бы избежать передачи отформатированной строки.

Ответы [ 5 ]

27 голосов
/ 18 мая 2013

Поскольку multiprocessing печатает строковое содержимое исключений, созданных в дочерних процессах, вы можете обернуть весь код своего дочернего процесса в try-исключение, за исключением того, что он перехватывает любые исключения, форматирует трассировки связанного стека и вызывает новый Exceptionкоторая содержит всю соответствующую информацию в своей строке:

Пример функции, которую я использую с multiprocessing.map:

def run_functor(functor):
    """
    Given a no-argument functor, run it and return its result. We can 
    use this with multiprocessing.map and map it over a list of job 
    functors to do them.

    Handles getting more than multiprocessing's pitiful exception output
    """

    try:
        # This is where you do your actual work
        return functor()
    except:
        # Put all exception text into an exception and raise that
        raise Exception("".join(traceback.format_exception(*sys.exc_info())))

В результате получается трассировка стека с другой отформатированной трассировкой стека, каксообщение об ошибке, помогающее при отладке.

23 голосов
/ 29 сентября 2014

Используя tblib, вы можете передать завернутые исключения и вызвать их позже:

import tblib.pickling_support
tblib.pickling_support.install()

from multiprocessing import Pool
import sys


class ExceptionWrapper(object):

    def __init__(self, ee):
        self.ee = ee
        __,  __, self.tb = sys.exc_info()

    def re_raise(self):
        raise self.ee.with_traceback(self.tb)
        # for Python 2 replace the previous line by:
        # raise self.ee, None, self.tb


# example how to use ExceptionWrapper

def inverse(i):
    """will fail for i == 0"""
    try:
        return 1.0 / i
    except Exception as e:
        return ExceptionWrapper(e)


def main():
    p = Pool(1)
    results = p.map(inverse, [0, 1, 2, 3])
    for result in results:
        if isinstance(result, ExceptionWrapper):
            result.re_raise()


if __name__ == "__main__":
    main()

Итак, если вы поймали исключение в вашем удаленном процессе, оберните его ExceptionWrapper и затем передайте обратно. Вызов re_reraise в основном процессе сделает всю работу.

13 голосов
/ 25 мая 2011

Кажется трудным сделать возможность отлова объекта трассировки. Но вы можете отправить только 2 первых элемента sys.exc_info() и предварительно отформатированную информацию трассировки с помощью метода traceback.extract_tb :

import multiprocessing
import sys
import traceback

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        except_type, except_class, tb = sys.exc_info()
        pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb)))

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print exc_info
to_child.close()
to_self.close()

, которые дают вам:

(<type 'exceptions.Exception'>, Exception('xxx',), [('test_tb.py', 7, 'foo', "raise Exception('xxx')")])

И тогда вы сможете получить больше информации о причине исключения (имя файла, номер строки, где возникло исключение, имя метода и оператор, который вызывает исключение)

6 голосов
/ 05 апреля 2017

Python 3

В Python 3 теперь метод get для multiprocessing.pool.Async возвращает полный возврат, см. http://bugs.python.org/issue13831.

Python 2

Используйте traceback.format_exc (что означает форматированное ожидание), чтобы получить строку трассировки. Сделать декоратор было бы гораздо удобнее, как показано ниже.

def full_traceback(func):
    import traceback, functools
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            msg = "{}\n\nOriginal {}".format(e, traceback.format_exc())
            raise type(e)(msg)
    return wrapper

Пример:

def func0():
    raise NameError("func0 exception")

def func1():
    return func0()

# Key is here!
@full_traceback
def main(i):
    return func1()

if __name__ == '__main__':
    from multiprocessing import Pool
    pool = Pool(4)
    try:
        results = pool.map_async(main, range(5)).get(1e5)
    finally:
        pool.close()
        pool.join()

трассировка с декоратором:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0

Original Traceback (most recent call last):
  File "bt.py", line 13, in wrapper
    return func(*args, **kwargs)
  File "bt.py", line 27, in main
    return func1()
  File "bt.py", line 23, in func1
    return func0()
  File "bt.py", line 20, in func0
    raise NameError("Exception in func0")
NameError: Exception in func0

traceback без декоратора:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0
3 голосов
/ 02 февраля 2017

Это вариант этот отличный ответ . Оба используют tblib для хранения трассировки.

Однако вместо того, чтобы возвращать объект исключения (как запрашивается OP), функцию worker можно оставить как есть и просто обернуть в try / except, чтобы сохранить исключения для повторного -raise.

import tblib.pickling_support
tblib.pickling_support.install()

import sys

class DelayedException(Exception):

    def __init__(self, ee):
        self.ee = ee
        __,  __, self.tb = sys.exc_info()
        super(DelayedException, self).__init__(str(ee))

    def re_raise(self):
        raise self.ee, None, self.tb

Пример

def worker():
    try:
        raise ValueError('Something went wrong.')
    except Exception as e:
        raise DelayedException(e)


if __name__ == '__main__':

    import multiprocessing

    pool = multiprocessing.Pool()
    try:
        pool.imap(worker, [1, 2, 3])
    except DelayedException as e:
        e.re_raise()
...