передать большой объем данных в stdin при использовании подпроцесса. Открыть - PullRequest
14 голосов
/ 06 мая 2011

Мне трудно понять, как питон решает эту простую проблему.

Моя проблема довольно проста. Если вы используете следующий код, он будет зависать. Это хорошо задокументировано в модуле подпроцесса doc.

import subprocess

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )
for i in range(100000):
    proc.stdin.write('%d\n' % i)
output = proc.communicate()[0]
print output

Поиск решения (есть очень проницательная тема, но я ее потерял). Я нашел это решение (среди прочего), которое использует явный форк:

import os
import sys
from subprocess import Popen, PIPE

def produce(to_sed):
    for i in range(100000):
        to_sed.write("%d\n" % i)
        to_sed.flush()
    #this would happen implicitly, anyway, but is here for the example
    to_sed.close()

def consume(from_sed):
    while 1:
        res = from_sed.readline()
        if not res:
            sys.exit(0)
            #sys.exit(proc.poll())
        print 'received: ', [res]

def main():
    proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE)
    to_sed = proc.stdin
    from_sed = proc.stdout

    pid = os.fork()
    if pid == 0 :
        from_sed.close()
        produce(to_sed)
        return
    else :
        to_sed.close()
        consume(from_sed)

if __name__ == '__main__':
    main()

Хотя это решение концептуально очень просто для понимания, оно использует еще один процесс и застряло на слишком низком уровне по сравнению с модулем подпроцесса (то есть просто чтобы скрыть подобные вещи ...).

Мне интересно: есть ли простое и чистое решение, использующее модуль подпроцесса, который не зависает, или для реализации этого шаблона мне нужно сделать шаг назад и реализовать цикл выбора старого стиля или явный форк? 1011 *

Спасибо

Ответы [ 10 ]

10 голосов
/ 25 декабря 2012

Если вы хотите чисто Python-решение, вам нужно поместить читателя или писателя в отдельный поток. Пакет threading - это легкий способ сделать это, с удобным доступом к обычным объектам и без беспорядочных разветвлений.

import subprocess
import threading
import sys

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )
def writer():
    for i in range(100000):
        proc.stdin.write('%d\n' % i)
    proc.stdin.close()
thread = threading.Thread(target=writer)
thread.start()
for line in proc.stdout:
    sys.stdout.write(line)
thread.join()
proc.wait()

Возможно, было бы неплохо увидеть модуль subprocess, модернизированный для поддержки потоков и сопрограмм, который позволил бы более элегантно конструировать конвейеры, смешивающие части Python и части оболочки.

7 голосов
/ 06 мая 2011

Если вы не хотите хранить все данные в памяти, вы должны использовать select.Например, что-то вроде:

import subprocess
from select import select
import os

proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

i = 0;
while True:
    rlist, wlist, xlist = [proc.stdout], [], []
    if i < 100000:
        wlist.append(proc.stdin)
    rlist, wlist, xlist = select(rlist, wlist, xlist)
    if proc.stdout in rlist:
        out = os.read(proc.stdout.fileno(), 10)
        print out,
        if not out:
            break
    if proc.stdin in wlist:
        proc.stdin.write('%d\n' % i)
        i += 1
        if i >= 100000:
            proc.stdin.close()
2 голосов
/ 12 октября 2011

Вот кое-что, что я использовал для загрузки 6G mysql-файлов при загрузке через подпроцессДержитесь подальше от оболочки = True.Небезопасно и приводит к потере ресурсов.

import subprocess

fhandle = None

cmd = [mysql_path,
      "-u", mysql_user, "-p" + mysql_pass],
      "-h", host, database]

fhandle = open(dump_file, 'r')
p = subprocess.Popen(cmd, stdin=fhandle, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

(stdout,stderr) = p.communicate()

fhandle.close()
1 голос
/ 10 июня 2016

Ваш код блокируется, как только буфер канала ОС 100d * в stdout заполнен. Если вы используете stdout=PIPE; Вы должны потреблять его вовремя, иначе может произойти тупик, как в вашем случае.

Если вам не нужен вывод во время работы процесса; Вы можете перенаправить его во временный файл:

#!/usr/bin/env python3
import subprocess
import tempfile

with tempfile.TemporaryFile('r+') as output_file:
    with subprocess.Popen(['cat'],
                          stdin=subprocess.PIPE,
                          stdout=output_file,
                          universal_newlines=True) as process:
        for i in range(100000):
            print(i, file=process.stdin)
    output_file.seek(0)  # rewind (and sync with the disk)
    print(output_file.readline(), end='')  # get  the first line of the output

Если вход / выход мал (умещается в памяти); Вы можете передать все данные сразу и получить выходные данные, используя .communicate(), который читает / пишет одновременно для вас:

#!/usr/bin/env python3
import subprocess

cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]),
                    stdout=subprocess.PIPE, universal_newlines=True)
print(cp.stdout.splitlines()[-1]) # print the last line

Для одновременного чтения / записи вручную вы можете использовать потоки, asyncio, fcntl и т. Д. @ Джед предоставил простое решение на основе потоков . Вот решение на основе asyncio:

#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE

async def pump_input(writer):
     try:
         for i in range(100000):
             writer.write(b'%d\n' % i)
             await writer.drain()
     finally:
         writer.close()

async def run():
    # start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(pump_input(process.stdin)) # write input
    async for line in process.stdout: # consume output
        print(int(line)**2) # print squares
    return await process.wait()  # wait for the child process to exit


if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

В Unix вы можете использовать решение на основе fcntl:

#!/usr/bin/env python3
import sys
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK
from shutil import copyfileobj
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF

def make_blocking(pipe, blocking=True):
    fd = pipe.fileno()
    if not blocking:
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK
    else:
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it


with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process:
    make_blocking(process.stdout, blocking=False)
    with process.stdin:
        for i in range(100000):
            #NOTE: the mode is block-buffered (default) and therefore
            # `cat` won't see it immidiately
            process.stdin.write(b'%d\n' % i)
            # a deadblock may happen here with a *blocking* pipe
            output = process.stdout.read(PIPE_BUF)
            if output is not None:
                sys.stdout.buffer.write(output)
    # read the rest
    make_blocking(process.stdout)
    copyfileobj(process.stdout, sys.stdout.buffer)
1 голос
/ 06 мая 2011

Для такого рода вещей оболочка работает намного лучше, чем подпроцесс.

Пишите очень простые приложения на Python, которые читают с sys.stdin и пишут на sys.stdout.

Соедините простые приложения вместе, используя конвейер оболочки.

Если хотите, запустите конвейер с помощью subprocess или просто напишите однострочный скрипт оболочки.

python part1.py | python part2.py

Это очень, очень эффективно. Он также переносим на все Linux (и Windows), если вы делаете его очень простым.

0 голосов
/ 18 октября 2017

Я искал пример кода для поэтапной итерации по выходным данным процесса, поскольку этот процесс потребляет свои данные от предоставленного итератора (также и по инкрементному).В основном:

import string
import random

# That's what I consider a very useful function, though didn't
# find any existing implementations.
def process_line_reader(args, stdin_lines):
    # args - command to run, same as subprocess.Popen
    # stdin_lines - iterable with lines to send to process stdin
    # returns - iterable with lines received from process stdout
    pass

# Returns iterable over n random strings. n is assumed to be infinity if negative.
# Just an example of function that returns potentially unlimited number of lines.
def random_lines(n, M=8):
    while 0 != n:
        yield "".join(random.choice(string.letters) for _ in range(M))
        if 0 < n:
            n -= 1

# That's what I consider to be a very convenient use case for
# function proposed above.
def print_many_uniq_numbered_random_lines():
    i = 0
    for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)):
        # Key idea here is that `process_line_reader` will feed random lines into
        # `uniq` process stdin as lines are consumed from returned iterable.
        print "#%i: %s" % (i, line)
        i += 1

Некоторые из предложенных здесь решений позволяют делать это с потоками (но это не всегда удобно) или с asyncio (который недоступен в Python 2.x).Ниже приведен пример работающей реализации, позволяющей это сделать.

import subprocess
import os
import fcntl
import select

class nonblocking_io(object):
    def __init__(self, f):
        self._fd = -1
        if type(f) is int:
            self._fd = os.dup(f)
            os.close(f)
        elif type(f) is file:
            self._fd = os.dup(f.fileno())
            f.close()
        else:
            raise TypeError("Only accept file objects or interger file descriptors")
        flag = fcntl.fcntl(self._fd, fcntl.F_GETFL)
        fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
    def __enter__(self):
        return self
    def __exit__(self, type, value, traceback):
        self.close()
        return False
    def fileno(self):
        return self._fd
    def close(self):
        if 0 <= self._fd:
            os.close(self._fd)
            self._fd = -1

class nonblocking_line_writer(nonblocking_io):
    def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep):
        super(nonblocking_line_writer, self).__init__(f)
        self._lines = iter(lines)
        self._lines_ended = False
        self._autoclose = autoclose
        self._buffer_size = buffer_size
        self._buffer_offset = 0
        self._buffer = bytearray()
        self._encoding = encoding
        self._linesep = bytearray(linesep, encoding)
    # Returns False when `lines` iterable is exhausted and all pending data is written
    def continue_writing(self):
        while True:
            if self._buffer_offset < len(self._buffer):
                n = os.write(self._fd, self._buffer[self._buffer_offset:])
                self._buffer_offset += n
                if self._buffer_offset < len(self._buffer):
                    return True
            if self._lines_ended:
                if self._autoclose:
                    self.close()
                return False
            self._buffer[:] = []
            self._buffer_offset = 0
            while len(self._buffer) < self._buffer_size:
                line = next(self._lines, None)
                if line is None:
                    self._lines_ended = True
                    break
                self._buffer.extend(bytearray(line, self._encoding))
                self._buffer.extend(self._linesep)

class nonblocking_line_reader(nonblocking_io):
    def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"):
        super(nonblocking_line_reader, self).__init__(f)
        self._autoclose = autoclose
        self._buffer_size = buffer_size
        self._encoding = encoding
        self._file_ended = False
        self._line_part = ""
    # Returns (lines, more) tuple, where lines is iterable with lines read and more will
    # be set to False after EOF.
    def continue_reading(self):
        lines = []
        while not self._file_ended:
            data = os.read(self._fd, self._buffer_size)
            if 0 == len(data):
                self._file_ended = True
                if self._autoclose:
                    self.close()
                if 0 < len(self._line_part):
                    lines.append(self._line_part.decode(self._encoding))
                    self._line_part = ""
                break
            for line in data.splitlines(True):
                self._line_part += line
                if self._line_part.endswith(("\n", "\r")):
                    lines.append(self._line_part.decode(self._encoding).rstrip("\n\r"))
                    self._line_part = ""
            if len(data) < self._buffer_size:
                break
        return (lines, not self._file_ended)

class process_line_reader(object):
    def __init__(self, args, stdin_lines):
        self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        self._reader = nonblocking_line_reader(self._p.stdout)
        self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines)
        self._iterator = self._communicate()
    def __iter__(self):
        return self._iterator
    def __enter__(self):
        return self._iterator
    def __exit__(self, type, value, traceback):
        self.close()
        return False
    def _communicate(self):
        read_set = [self._reader]
        write_set = [self._writer]
        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [])
            except select.error, e:
                if e.args[0] == errno.EINTR:
                    continue
                raise
            if self._reader in rlist:
                stdout_lines, more = self._reader.continue_reading()
                for line in stdout_lines:
                    yield line
                if not more:
                    read_set.remove(self._reader)
            if self._writer in wlist:
                if not self._writer.continue_writing():
                    write_set.remove(self._writer)
        self.close()
    def lines(self):
        return self._iterator
    def close(self):
        if self._iterator is not None:
            self._reader.close()
            self._writer.close()
            self._p.wait()
            self._iterator = None
0 голосов
/ 19 сентября 2016

Самое простое решение, которое я могу придумать:

from subprocess import Popen, PIPE
from threading import Thread

s = map(str,xrange(10000)) # a large string
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=1)
Thread(target=lambda: any((p.stdin.write(b) for b in s)) or p.stdin.close()).start()
print (p.stdout.read())

Буферизованная версия:

from subprocess import Popen, PIPE
from threading import Thread

s = map(str,xrange(10000)) # a large string
n = 1024 # buffer size
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=n)
Thread(target=lambda: any((p.stdin.write(c) for c in (s[i:i+n] for i in xrange(0, len(s), n)))) or p.stdin.close()).start()
print (p.stdout.read())
0 голосов
/ 08 июня 2016

Использование aiofiles & asyncio в python 3.5:

Немного сложно, но для записи в stdin требуется всего 1024 байта памяти!

import asyncio
import aiofiles
import sys
from os.path import dirname, join, abspath
import subprocess as sb


THIS_DIR = abspath(dirname(__file__))
SAMPLE_FILE = join(THIS_DIR, '../src/hazelnut/tests/stuff/sample.mp4')
DEST_PATH = '/home/vahid/Desktop/sample.mp4'


async def async_file_reader(f, buffer):
    async for l in f:
        if l:
            buffer.append(l)
        else:
            break
    print('reader done')


async def async_file_writer(source_file, target_file):
    length = 0
    while True:
        input_chunk = await source_file.read(1024)
        if input_chunk:
            length += len(input_chunk)
            target_file.write(input_chunk)
            await target_file.drain()
        else:
            target_file.write_eof()
            break

    print('writer done: %s' % length)


async def main():
    dir_name = dirname(DEST_PATH)
    remote_cmd = 'ssh localhost mkdir -p %s && cat - > %s' % (dir_name, DEST_PATH)

    stdout, stderr = [], []
    async with aiofiles.open(SAMPLE_FILE, mode='rb') as f:
        cmd = await asyncio.create_subprocess_shell(
            remote_cmd,
            stdin=sb.PIPE,
            stdout=sb.PIPE,
            stderr=sb.PIPE,
        )

        await asyncio.gather(*(
            async_file_reader(cmd.stdout, stdout),
            async_file_reader(cmd.stderr, stderr),
            async_file_writer(f, cmd.stdin)
        ))

        print('EXIT STATUS: %s' % await cmd.wait())

    stdout, stderr = '\n'.join(stdout), '\n'.join(stderr)

    if stdout:
        print(stdout)

    if stderr:
        print(stderr, file=sys.stderr)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Результат:

writer done: 383631
reader done
reader done
EXIT STATUS: 0
0 голосов
/ 06 мая 2011

Теперь я знаю, что это не удовлетворит пуриста в вас полностью, так как ввод должен уместиться в памяти, и у вас нет возможности интерактивно работать с вводом-выводом, но по крайней мере это отлично работает на вашем примере,Метод связи дополнительно принимает входные данные в качестве аргумента, и если вы подадите свой процесс на его входные данные таким образом, он будет работать.

import subprocess

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )

input = "".join('{0:d}\n'.format(i) for i in range(100000))
output = proc.communicate(input)[0]
print output

Что касается более крупной проблемы, вы можете создать подкласс Popen, переписать __init__принять потоковые объекты в качестве аргументов для stdin, stdout, stderr и переписать метод _communicate (волосатый для кроссплатформенности, вам нужно сделать это дважды, см. источник subprocess.py) для вызова read () в потоке stdinи напишите () вывод в потоки stdout и stderr.Что меня беспокоит в этом подходе, так это то, что, насколько я знаю, это еще не было сделано.Когда очевидные вещи раньше не делались, обычно есть причина (она не работает должным образом), но я не могу понять, почему это не так, кроме того факта, что вам нужны потоки для обеспечения безопасности потоков в Windows.

0 голосов
/ 06 мая 2011

Вот пример (Python 3) чтения по одной записи за раз из gzip с использованием канала:

cmd = 'gzip -dc compressed_file.gz'
pipe = Popen(cmd, stdout=PIPE).stdout

for line in pipe:
    print(":", line.decode(), end="")

Я знаю, что для этого есть стандартный модуль, он просто предназначен в качестве примера,Вы можете прочитать весь вывод за один раз (например, обратные тики оболочки), используя метод связи, но, очевидно, вы должны быть осторожны с размером памяти.

Вот пример (снова Python 3) записи записейк программе lp (1) в Linux:

cmd = 'lp -'
proc = Popen(cmd, stdin=PIPE)
proc.communicate(some_data.encode())
...