Многопроцессорная обработка Python: как Я НАДЕЖНО перенаправить стандартный вывод из дочернего процесса? - PullRequest
26 голосов
/ 10 октября 2011

NB. Я видел Выход журнала многопроцессорной обработки. Процесс - к сожалению, он не отвечает на этот вопрос.

Я создаю дочерний процесс (в Windows) с помощью многопроцессорной обработки. Я хочу, чтобы все вывода stdout и stderr дочернего процесса были перенаправлены в файл журнала, а не появлялись на консоли. Единственное предложение, которое я видел, это для дочернего процесса установить sys.stdout в файл. Однако это не позволяет эффективно перенаправить весь вывод stdout из-за поведения перенаправления stdout в Windows.

Чтобы проиллюстрировать проблему, создайте Windows DLL со следующим кодом

#include <iostream>

extern "C"
{
    __declspec(dllexport) void writeToStdOut()
    {
        std::cout << "Writing to STDOUT from test DLL" << std::endl;
    }
}

Затем создайте и запустите скрипт python, как показано ниже, который импортирует эту DLL и вызывает функцию:

from ctypes import *
import sys

print
print "Writing to STDOUT from python, before redirect"
print
sys.stdout = open("stdout_redirect_log.txt", "w")
print "Writing to STDOUT from python, after redirect"

testdll = CDLL("Release/stdout_test.dll")
testdll.writeToStdOut()

Чтобы увидеть то же поведение, что и у меня, вероятно, необходимо, чтобы DLL была построена с использованием другой среды выполнения C, чем та, которую использует Python. В моем случае python построен на Visual Studio 2010, но моя DLL создана на VS 2005.

Я вижу, что консоль показывает:

> stdout_test.py

Writing to STDOUT from python, before redirect

Writing to STDOUT from test DLL

В то время как файл stdout_redirect_log.txt в конечном итоге содержит:

Writing to STDOUT from python, after redirect

Другими словами, установка sys.stdout не смогла перенаправить вывод stdout, сгенерированный DLL. Это неудивительно, учитывая природу базовых API для перенаправления стандартного вывода в Windows. Я сталкивался с этой проблемой на родном уровне / C ++ и никогда не нашел способа надежно перенаправить стандартный вывод изнутри процесса. Это должно быть сделано извне.

Это на самом деле и есть причина, по которой я запускаю дочерний процесс - чтобы я мог подключаться к его каналам извне и, таким образом, гарантировать, что я перехватываю весь его вывод. Я определенно могу сделать это, запустив процесс вручную с помощью pywin32, но я бы очень хотел иметь возможность использовать средства многопроцессорности, в частности возможность общаться с дочерним процессом через многопроцессорный объект Pipe, чтобы получить прогресс обновления. Вопрос в том, существует ли какой-либо способ использовать многопроцессорную обработку для своих возможностей IPC и , чтобы надежно перенаправить весь вывод дочернего элемента stdout и stderr в файл.

ОБНОВЛЕНИЕ: Глядя на исходный код для многопроцессорной обработки. У процессов есть статический член _Popen, который выглядит так, как будто он может использоваться для переопределения класса, используемого для создания процесса. Если для него установлено значение «Нет» (по умолчанию), он использует multiprocessing.forking._Popen, но выглядит так:

multiprocessing.Process._Popen = MyPopenClass

Я мог бы отменить создание процесса. Однако, хотя я мог бы извлечь это из multiprocessing.forking._Popen, похоже, мне пришлось бы скопировать кучу внутренних вещей в мою реализацию, что звучит странно и не очень перспективно для будущего. Если это единственный выбор, я думаю, что я бы, пожалуй, был полон решимости сделать все вручную с помощью pywin32.

Ответы [ 4 ]

8 голосов
/ 02 августа 2012

Хорошее решение, которое вы предлагаете: создайте свои процессы вручную, чтобы у вас был явный доступ к их дескрипторам файла stdout / stderr.Затем вы можете создать сокет для связи с подпроцессом и использовать multiprocessing.connection через этот сокет (multiprocessing.Pipe создает объект соединения того же типа, так что это должно предоставить вам все те же функциональные возможности IPC).

Вот пример из двух файлов.

master.py:

import multiprocessing.connection
import subprocess
import socket
import sys, os

## Listen for connection from remote process (and find free port number)
port = 10000
while True:
    try:
        l = multiprocessing.connection.Listener(('localhost', int(port)), authkey="secret")
        break
    except socket.error as ex:
        if ex.errno != 98:
            raise
        port += 1  ## if errno==98, then port is not available.

proc = subprocess.Popen((sys.executable, "subproc.py", str(port)), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

## open connection for remote process
conn = l.accept()
conn.send([1, "asd", None])
print(proc.stdout.readline())

subproc.py:

import multiprocessing.connection
import subprocess
import sys, os, time

port = int(sys.argv[1])
conn = multiprocessing.connection.Client(('localhost', port), authkey="secret")

while True:
    try:
        obj = conn.recv()
        print("received: %s\n" % str(obj))
        sys.stdout.flush()
    except EOFError:  ## connection closed
        break

Вы также можете захотеть увидеть первый ответ на этот вопрос , чтобы получить неблокирующие чтения из подпроцесса.

1 голос
/ 31 октября 2011

Я не думаю, что у вас есть лучший вариант, чем перенаправление подпроцесса в файл, как вы упомянули в своем комментарии.

Способ, которым консоль stdin / out / err работает в Windows, заключается в том, что каждый процесс, в котором он родился, имеет свои std дескрипторы . Вы можете изменить их с помощью SetStdHandle . Когда вы модифицируете Python sys.stdout, вы изменяете только там, где Python печатает материал, а не там, где другие DLL печатают материал. Часть CRT в вашей DLL использует GetStdHandle, чтобы выяснить, куда печатать. Если вы хотите, вы можете делать все, что вам нужно, в Windows API в вашей DLL или в вашем скрипте python с pywin32. Хотя я думаю, что это будет проще с подпроцессом .

0 голосов
/ 23 января 2019

В моей ситуации я изменил sys.stdout.write для записи в PySide QTextEdit. Я не мог читать с sys.stdout и не знал, как изменить sys.stdout, чтобы сделать его читаемым. Я создал две трубы. Один для stdout, а другой для stderr. В отдельном процессе я перенаправляю sys.stdout и sys.stderr на дочернее соединение многопроцессорного канала. В основном процессе я создал два потока для чтения родительского канала stdout и stderr и перенаправления данных канала на sys.stdout и sys.stderr.

import sys
import contextlib
import threading
import multiprocessing as mp
import multiprocessing.queues
from queue import Empty
import time


class PipeProcess(mp.Process):
    """Process to pipe the output of the sub process and redirect it to this sys.stdout and sys.stderr.

    Note:
        The use_queue = True argument will pass data between processes using Queues instead of Pipes. Queues will
        give you the full output and read all of the data from the Queue. A pipe is more efficient, but may not
        redirect all of the output back to the main process.
    """
    def __init__(self, group=None, target=None, name=None, args=tuple(), kwargs={}, *_, daemon=None,
                 use_pipe=None, use_queue=None):
        self.read_out_th = None
        self.read_err_th = None
        self.pipe_target = target
        self.pipe_alive = mp.Event()

        if use_pipe or (use_pipe is None and not use_queue):  # Default
            self.parent_stdout, self.child_stdout = mp.Pipe(False)
            self.parent_stderr, self.child_stderr = mp.Pipe(False)
        else:
            self.parent_stdout = self.child_stdout = mp.Queue()
            self.parent_stderr = self.child_stderr = mp.Queue()

        args = (self.child_stdout, self.child_stderr, target) + tuple(args)
        target = self.run_pipe_out_target

        super(PipeProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs,
                                          daemon=daemon)

    def start(self):
        """Start the multiprocess and reading thread."""
        self.pipe_alive.set()
        super(PipeProcess, self).start()

        self.read_out_th = threading.Thread(target=self.read_pipe_out,
                                            args=(self.pipe_alive, self.parent_stdout, sys.stdout))
        self.read_err_th = threading.Thread(target=self.read_pipe_out,
                                            args=(self.pipe_alive, self.parent_stderr, sys.stderr))
        self.read_out_th.daemon = True
        self.read_err_th.daemon = True
        self.read_out_th.start()
        self.read_err_th.start()

    @classmethod
    def run_pipe_out_target(cls, pipe_stdout, pipe_stderr, pipe_target, *args, **kwargs):
        """The real multiprocessing target to redirect stdout and stderr to a pipe or queue."""
        sys.stdout.write = cls.redirect_write(pipe_stdout)  # , sys.__stdout__)  # Is redirected in main process
        sys.stderr.write = cls.redirect_write(pipe_stderr)  # , sys.__stderr__)  # Is redirected in main process

        pipe_target(*args, **kwargs)

    @staticmethod
    def redirect_write(child, out=None):
        """Create a function to write out a pipe and write out an additional out."""
        if isinstance(child, mp.queues.Queue):
            send = child.put
        else:
            send = child.send_bytes  # No need to pickle with child_conn.send(data)

        def write(data, *args):
            try:
                if isinstance(data, str):
                    data = data.encode('utf-8')

                send(data)
                if out is not None:
                    out.write(data)
            except:
                pass
        return write

    @classmethod
    def read_pipe_out(cls, pipe_alive, pipe_out, out):
        if isinstance(pipe_out, mp.queues.Queue):
            # Queue has better functionality to get all of the data
            def recv():
                return pipe_out.get(timeout=0.5)

            def is_alive():
                return pipe_alive.is_set() or pipe_out.qsize() > 0
        else:
            # Pipe is more efficient
            recv = pipe_out.recv_bytes  # No need to unpickle with data = pipe_out.recv()
            is_alive = pipe_alive.is_set

        # Loop through reading and redirecting data
        while is_alive():
            try:
                data = recv()
                if isinstance(data, bytes):
                    data = data.decode('utf-8')
                out.write(data)
            except EOFError:
                break
            except Empty:
                pass
            except:
                pass

    def join(self, *args):
        # Wait for process to finish (unless a timeout was given)
        super(PipeProcess, self).join(*args)

        # Trigger to stop the threads
        self.pipe_alive.clear()

        # Pipe must close to prevent blocking and waiting on recv forever
        if not isinstance(self.parent_stdout, mp.queues.Queue):
            with contextlib.suppress():
                self.parent_stdout.close()
            with contextlib.suppress():
                self.parent_stderr.close()

        # Close the pipes and threads
        with contextlib.suppress():
            self.read_out_th.join()
        with contextlib.suppress():
            self.read_err_th.join()

def run_long_print():
    for i in range(1000):
        print(i)
        print(i, file=sys.stderr)

    print('finished')


if __name__ == '__main__':
    # Example test write (My case was a QTextEdit)
    out = open('stdout.log', 'w')
    err = open('stderr.log', 'w')

    # Overwrite the write function and not the actual stdout object to prove this works
    sys.stdout.write = out.write
    sys.stderr.write = err.write

    # Create a process that uses pipes to read multiprocess output back into sys.stdout.write
    proc = PipeProcess(target=run_long_print, use_queue=True)  # If use_pipe=True Pipe may not write out all values
    # proc.daemon = True  # If daemon and use_queue Not all output may be redirected to stdout
    proc.start()

    # time.sleep(5)  # Not needed unless use_pipe or daemon and all of stdout/stderr is desired

    # Close the process
    proc.join()  # For some odd reason this blocks forever when use_queue=False

    # Close the output files for this test
    out.close()
    err.close()
0 голосов
/ 23 января 2012

Я предполагаю, что я не в базе и что-то упустил, но вот что стоит того, что пришло в голову, когда я прочитал ваш вопрос.

Если вы можете перехватить все stdout и stderr (у меня сложилось впечатление из вашего вопроса), то почему бы не добавить или обернуть эту функцию захвата вокруг каждого из ваших процессов? Затем отправьте то, что записано через очередь, потребителю, который может делать все, что вы хотите со всеми выходами?

...