Как подключить отладчик к подпроцессу Python? - PullRequest
27 голосов
/ 17 января 2011

Мне нужно отладить дочерний процесс, порожденный multiprocessing.Process().Дегаггер pdb, похоже, не знает о разветвлении и не может подключиться к уже запущенным процессам.

Существуют ли более умные отладчики python, которые можно подключить к подпроцессу?

Ответы [ 6 ]

49 голосов
/ 14 мая 2014

Я искал простое решение этой проблемы и придумал следующее:

import sys
import pdb

class ForkedPdb(pdb.Pdb):
    """A Pdb subclass that may be used
    from a forked multiprocessing child

    """
    def interaction(self, *args, **kwargs):
        _stdin = sys.stdin
        try:
            sys.stdin = open('/dev/stdin')
            pdb.Pdb.interaction(self, *args, **kwargs)
        finally:
            sys.stdin = _stdin

Используйте его так же, как и классический Pdb:

ForkedPdb().set_trace()
12 голосов
/ 18 января 2011

Winpdb - это определение более умного отладчика Python.Он явно поддерживает переход от вилки , но не уверен, что он хорошо работает с multiprocessing.Process(), но стоит попробовать.

Список кандидатов для проверки поддержки вашего варианта использования см.список Python Debuggers в вики.

6 голосов
/ 05 августа 2015

Это уточнение ответа Ромуальда, который восстанавливает исходный стандарт с использованием его файлового дескриптора.Это позволяет readline работать внутри отладчика.Кроме того, в pdb специальное управление KeyboardInterrupt отключено, чтобы не мешать многопроцессорному обработчику sigint.

class ForkablePdb(pdb.Pdb):

    _original_stdin_fd = sys.stdin.fileno()
    _original_stdin = None

    def __init__(self):
        pdb.Pdb.__init__(self, nosigint=True)

    def _cmdloop(self):
        current_stdin = sys.stdin
        try:
            if not self._original_stdin:
                self._original_stdin = os.fdopen(self._original_stdin_fd)
            sys.stdin = self._original_stdin
            self.cmdloop()
        finally:
            sys.stdin = current_stdin
1 голос
/ 31 мая 2018

Опираясь на идею @memplex, мне пришлось изменить ее, чтобы она работала с joblib, установив sys.stdin в конструкторе и передав его напрямую через joblib.

import os
import pdb
import signal
import sys
import joblib

_original_stdin_fd = None

class ForkablePdb(pdb.Pdb):
    _original_stdin = None
    _original_pid = os.getpid()

    def __init__(self):
        pdb.Pdb.__init__(self)
        if self._original_pid != os.getpid():
            if _original_stdin_fd is None:
                raise Exception("Must set ForkablePdb._original_stdin_fd to stdin fileno")

            self.current_stdin = sys.stdin
            if not self._original_stdin:
                self._original_stdin = os.fdopen(_original_stdin_fd)
            sys.stdin = self._original_stdin

    def _cmdloop(self):
        try:
            self.cmdloop()
        finally:
            sys.stdin = self.current_stdin

def handle_pdb(sig, frame):
    ForkablePdb().set_trace(frame)

def test(i, fileno):
    global _original_stdin_fd
    _original_stdin_fd = fileno
    while True:
        pass    

if __name__ == '__main__':
    print "PID: %d" % os.getpid()
    signal.signal(signal.SIGUSR2, handle_pdb)
    ForkablePdb().set_trace()
    fileno = sys.stdin.fileno()
    joblib.Parallel(n_jobs=2)(joblib.delayed(test)(i, fileno) for i in range(10))
0 голосов
/ 20 марта 2018

У меня была идея создать «фиктивные» классы, чтобы имитировать реализацию методов, которые вы используете из многопроцессорной обработки:

from multiprocessing import Pool


class DummyPool():
    @staticmethod
    def apply_async(func, args, kwds):
        return DummyApplyResult(func(*args, **kwds))

    def close(self): pass
    def join(self): pass


class DummyApplyResult():
    def __init__(self, result):
        self.result = result

    def get(self):
        return self.result


def foo(a, b, switch):
    # set trace when DummyPool is used
    # import ipdb; ipdb.set_trace()
    if switch:
        return b - a
    else:
        return a - b


if __name__ == '__main__':
    xml = etree.parse('C:/Users/anmendoza/Downloads/jim - 8.1/running-config.xml')
    pool = DummyPool()  # switch between Pool() and DummyPool() here
    results = []
    results.append(pool.apply_async(foo, args=(1, 100), kwds={'switch': True}))
    pool.close()
    pool.join()
    results[0].get()
0 голосов
/ 17 января 2011

Если вы на поддерживаемой платформе, попробуйте DTrace .Большинство семейства BSD / Solaris / OS X поддерживают DTrace.

Вот вступительное слово автора .Вы можете использовать Dtrace для отладки чего угодно.

Вот ТАК сообщение об обучении DTrace.

...