Асинхронный подпроцесс в Windows - PullRequest
4 голосов
/ 31 марта 2010

Прежде всего, общая проблема, которую я решаю, немного сложнее, чем я показываю здесь, поэтому, пожалуйста, не говорите мне «использовать потоки с блокировкой», поскольку это не решило бы мою реальную ситуацию без честного, ЧЕСТНОГО бит переписывания и рефакторинга.

У меня есть несколько приложений, которые я не могу изменить, которые берут данные из stdin и выкачивают их на stdout после выполнения своей магии. Моя задача - связать несколько таких программ Проблема в том, что иногда они задыхаются, и поэтому мне нужно отслеживать их прогресс, который выводится на STDERR.

pA = subprocess.Popen(CommandA,  shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# ... some more processes make up the chain, but that is irrelevant to the problem
pB = subprocess.Popen(CommandB, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=pA.stdout )

Теперь, чтение напрямую через pA.stdout.readline () и pB.stdout.readline () или обычные функции read () является проблемой блокировки. Поскольку разные приложения выводят данные с разной скоростью и в разных форматах, блокировка невозможна. (И, как я писал выше, многопоточность - это не вариант, если, конечно, в крайнем случае.) pA.communicate() безопасна для тупиков, но поскольку мне нужна живая информация, это тоже не вариант.

Таким образом, Google привел меня к этому асинхронному фрагменту подпроцесса в ActiveState.

Сначала все хорошо, пока я не осуществлю это. Сравнивая вывод cmd.exe pA.exe | pB.exe, игнорируя тот факт, что оба вывода выводятся в одно и то же окно, создавая беспорядок, я вижу очень мгновенные обновления. Однако я реализую то же самое, используя приведенный выше фрагмент и объявленную там функцию read_some(), и для уведомления об обновлениях одного канала требуется более 10 секунд. Но когда он это делает, у него есть обновления, ведущие к 40% прогрессу, например.

Таким образом, я провожу еще несколько исследований и вижу многочисленные темы, касающиеся PeekNamedPipe, анонимных дескрипторов и возврата 0 байтов, даже если в канале есть информация. Поскольку этот предмет оказался вне моей компетенции для исправления или кодирования, я приехал в Stack Overflow, чтобы найти руководство. :)

Моя платформа - W7 64-битная с Python 2.6, приложения 32-битные на случай, если это имеет значение, и совместимость с Unix не является проблемой. Я даже могу иметь дело с полным решением ctypes или pywin32, которое полностью подрывает подпроцесс, если это единственное решение, при условии, что я могу читать из каждого канала stderr асинхронно с немедленной производительностью и без блокировок. :)

Ответы [ 4 ]

4 голосов
/ 04 апреля 2010

Насколько плохо использовать потоки? Я столкнулся с такой же проблемой и в итоге решил использовать потоки, чтобы собрать все данные в stdout и stderr подпроцесса и поместить их в потокобезопасную очередь, которую основной поток может читать блокирующим образом, без необходимости беспокойтесь о потоке, происходящем за кулисами.

Не ясно, какие проблемы вы ожидаете с решением на основе потоков и блокировок. Вы беспокоитесь о том, чтобы сделать остальную часть своего кода поточно-ориентированным? Это не должно быть проблемой, поскольку поток ввода-вывода не должен взаимодействовать с остальной частью вашего кода или данных. Если у вас очень строгие требования к памяти или ваш конвейер очень длинный, то, возможно, вам не понравится порождать так много потоков. Я не знаю достаточно о вашей ситуации, поэтому я не могу сказать, может ли это быть проблемой, но мне кажется, что, поскольку вы уже порождаете дополнительные процессы, несколько потоков для взаимодействия с ними не должны быть ужасное бремя. В моей ситуации я не обнаружил, что эти потоки ввода-вывода особенно проблематичны.

Моя функция потока выглядела примерно так:

def simple_io_thread(pipe, queue, tag, stop_event):
    """
    Read line-by-line from pipe, writing (tag, line) to the
    queue. Also checks for a stop_event to give up before
    the end of the stream.
    """
    while True:
        line = pipe.readline()

        while True:
            try:
                # Post to the queue with a large timeout in case the
                # queue is full.
                queue.put((tag, line), block=True, timeout=60)
                break
            except Queue.Full:
                if stop_event.isSet():
                    break
                continue
        if stop_event.isSet() or line=="":
            break
    pipe.close()

Когда я запускаю подпроцесс, я делаю это:

outputqueue = Queue.Queue(50)
stop_event = threading.Event()
process = subprocess.Popen(
    command,
    cwd=workingdir,
    env=env,
    shell=useshell,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
stderr_thread = threading.Thread(
    target=simple_io_thread,
    args=(process.stderr, outputqueue, "STDERR", stop_event)
)
stdout_thread = threading.Thread(
    target=simple_io_thread,
    args=(process.stdout, outputqueue, "STDOUT", stop_event)
)
stderr_thread.daemon = True
stdout_thread.daemon = True
stderr_thread.start()
stdout_thread.start()

Тогда, когда я захочу прочитать, я могу просто заблокировать выходную очередь - каждый прочитанный из него элемент содержит либо строку, чтобы определить, из какого канала он поступил, и строку текста из этого канала. Очень маленький код выполняется в отдельном потоке, и он связывается с основным потоком только через потокобезопасную очередь (плюс событие на случай, если мне придется отказаться раньше). Возможно, этот подход был бы полезен и позволил бы вам решить проблему с потоками и блокировками, но без необходимости переписывать много кода?

(Мое решение усложняется, потому что я иногда хочу досрочно завершить подпроцессы и хочу быть уверенным, что все потоки закончатся. Если это не проблема, вы можете избавиться от всего, что связано с stop_event, и оно становится довольно красивым сжато.)

1 голос
/ 08 апреля 2010

Я предполагаю, что конвейер процесса не будет тупиковым, если он использует только stdin и stdout;и проблема, которую вы пытаетесь решить, состоит в том, как сделать так, чтобы она не зашла в тупик, если они пишут в stderr (и должны иметь дело с возможным резервным копированием stderr).

Если вы разрешаете нескольким процессам писать в stderr, Вы должны остерегаться их смешения результатов.Я предполагаю, что у вас есть что-то отсортировано;просто, чтобы убедиться в этом.

Помните о флаге -u для python;При тестировании полезно выяснить, не мешает ли вам буферизация ОС.

Если вы хотите эмулировать select () для файловых дескрипторов в win32, ваш единственный выбор - использовать PeekNamedPipe () и друзей.У меня есть фрагмент кода, который считывает линейно-ориентированный вывод из нескольких процессов одновременно, который вы даже можете использовать напрямую - попробуйте передать ему список дескрипторов proc.stderr и начинайте.

class NoLineError(Exception): pass
class NoMoreLineError(Exception): pass
class LineReader(object):
    """Helper class for multi_readlines."""
    def __init__(self, f):
        self.fd = f.fileno()
        self.osf = msvcrt.get_osfhandle(self.fd)
        self.buf = ''

    def getline(self):
        """Returns a line of text, or raises NoLineError, or NoMoreLineError."""
        try:
            _, avail, _ = win32pipe.PeekNamedPipe(self.osf, 0)
            bClosed = False
        except pywintypes.error:
            avail = 0
            bClosed = True

        if avail:
            self.buf += os.read(self.fd, avail)

        idx = self.buf.find('\n')
        if idx >= 0:
            ret, self.buf = self.buf[:idx+1], self.buf[idx+1:]
            return ret
        elif bClosed:
            if self.buf:
                ret, self.buf = self.buf, None
                return ret
            else:
                raise NoMoreLineError
        else:
            raise NoLineError


def multi_readlines(fs, timeout=0):
    """Read lines from |fs|, a list of file objects.
    The lines come out in arbitrary order, depending on which files
    have output available first."""
    if type(fs) not in (list, tuple):
        raise Exception("argument must be a list.")
    objs = [LineReader(f) for f in fs]
    for i,obj in enumerate(objs): obj._index = i
    while objs:
        yielded = 0
        for i,obj in enumerate(objs):
            try:
                yield (obj._index, obj.getline())
                yielded += 1
            except NoLineError:
                #time.sleep(timeout)
                pass
            except NoMoreLineError:
                del objs[i]
                break   # Because we mutated the array

        if not yielded:
            time.sleep(timeout)
            pass

Я никогда не сталкивался с проблемой «Peek возвращает 0 байт, даже если данные доступны».Если это случается с другими, я уверен, что их libc буферизует их stdout / stderr перед отправкой данных в ОС;с этим ничего не поделаешь.Вы должны заставить приложение каким-либо образом использовать небуферизованный вывод (-u к python; вызовы win32 / libc для изменения дескриптора файла stderr, ...)

Тот факт, что вы ничего не видите, затем тонна обновлений, заставляет меня думать, что ваша проблема в буферизации на исходном конце.Win32 libc может буферизоваться по-разному, если он пишет в канал, а не в консоль.Опять же, лучшее, что вы можете сделать из-за пределов этих программ, - это агрессивно истощить их вывод.

0 голосов
/ 01 апреля 2010

Разве вы не можете просто выполнить неблокирующее чтение из pA.stdout и pB.stdout? Вы будете иметь плотную занятую петлю, чтобы управлять?

Неблокирующее чтение для подпроцесса. ТРУБА в python

0 голосов
/ 01 апреля 2010

А как насчет использования Twisted FD? http://twistedmatrix.com/documents/8.1.0/api/twisted.internet.fdesc.html

Это не асинхронный, но неблокирующий. Для асинхронной работы вы можете использовать Twisted?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...