Неблокирующее чтение на подпроцесс. PIPE в Python - PullRequest
462 голосов
/ 17 декабря 2008

Я использую модуль подпроцесса , чтобы запустить подпроцесс и подключиться к его выходному потоку (stdout). Я хочу иметь возможность выполнять неблокирующие чтения на своем стандартном выводе. Есть ли способ сделать .readline неблокирующим или проверить, есть ли данные в потоке, прежде чем я вызову .readline? Я хотел бы, чтобы это было переносимо или, по крайней мере, работало под Windows и Linux.

вот как я делаю это сейчас (блокировка на .readline, если нет доступных данных):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

Ответы [ 27 ]

379 голосов
/ 04 февраля 2011

fcntl, select, asyncproc в этом случае не помогут.

Надежный способ чтения потока без блокировки независимо от операционной системы заключается в использовании Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
72 голосов
/ 28 ноября 2009

У меня часто была похожая проблема; Программы на Python, которые я пишу часто, должны иметь возможность выполнять некоторые основные функции, одновременно принимая пользовательский ввод из командной строки (stdin). Простое помещение функции обработки пользовательского ввода в другой поток не решает проблему, потому что readline() блокируется и не имеет времени ожидания. Если основная функциональность завершена и больше нет необходимости ждать дальнейшего ввода данных пользователем, я обычно хочу, чтобы моя программа завершила работу, но это не может быть, потому что readline() все еще блокируется в другом потоке, ожидая строки. Решение, которое я нашел для этой проблемы, состоит в том, чтобы сделать stdin неблокирующим файлом с помощью модуля fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

На мой взгляд, это немного чище, чем использование модулей выбора или сигнала для решения этой проблемы, но опять же, это работает только в UNIX ...

37 голосов
/ 20 декабря 2013

В Python 3.4 представлен новый временный API для асинхронного ввода-вывода - asyncio module .

Подход похож на twisted ответ на основе @Bryan Ward - определить протокол и его методы вызываются, как только данные готовы:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

См. «Подпроцесс» в документации .

Существует высокоуровневый интерфейс asyncio.create_subprocess_exec(), который возвращает Process объектов , который позволяет асинхронно читать строку, используя StreamReader.readline() coroutine async / await Python 3.5+ синтаксис ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() выполняет следующие задачи:

  • запустить подпроцесс, перенаправить его стандартный поток в канал
  • асинхронно читать строку из stdout подпроцесса
  • уничтожить подпроцесс
  • дождаться выхода

Каждый шаг при необходимости может быть ограничен тайм-аутом.

20 голосов
/ 13 января 2009

Попробуйте модуль asyncproc . Например:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Модуль позаботится обо всех потоках, как это предложено S.Lott.

17 голосов
/ 22 апреля 2011

Вы можете сделать это очень легко в Twisted . В зависимости от существующей кодовой базы, это может быть не так просто использовать, но если вы создаете искаженное приложение, такие вещи становятся почти тривиальными. Вы создаете класс ProcessProtocol и переопределяете метод outReceived(). Twisted (в зависимости от используемого реактора) обычно представляет собой большой цикл select() с обратными вызовами, установленными для обработки данных из различных файловых дескрипторов (часто сетевых сокетов). Таким образом, метод outReceived() - это просто установка обратного вызова для обработки данных, поступающих из STDOUT. Простой пример, демонстрирующий это поведение, выглядит следующим образом:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Документация Twisted содержит полезную информацию по этому вопросу.

Если вы строите все свое приложение на Twisted, оно делает асинхронную связь с другими процессами, локальными или удаленными, действительно элегантной, как это. С другой стороны, если ваша программа построена не на Twisted, это не очень полезно. Надеемся, что это может быть полезно для других читателей, даже если это не применимо для вашего конкретного приложения.

17 голосов
/ 26 января 2011

Используйте выбор и чтение (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Для readline () - как:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
8 голосов
/ 17 декабря 2008

Одним из решений является создание другого процесса для выполнения чтения процесса или создание потока процесса с таймаутом.

Вот потоковая версия функции тайм-аута:

http://code.activestate.com/recipes/473878/

Тем не менее, вам нужно читать стандартный вывод по мере его поступления? Другим решением может быть выгрузка вывода в файл и ожидание завершения процесса с помощью p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
7 голосов
/ 15 марта 2013

Существующие решения не работают для меня (подробности ниже). В конце концов, нам удалось реализовать readline с использованием read (1) (на основе этого ответа ). Последний не блокирует:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Почему существующие решения не работают:

  1. Решения, которые требуют readline (в том числе на основе очереди), всегда блокируются. Трудно (невозможно?) Убить поток, который выполняет readline. Он будет уничтожен только после завершения процесса, который его создал, но не когда процесс, производящий вывод, будет уничтожен.
  2. Смешивание низкоуровневого fcntl с высокоуровневыми вызовами readline может работать некорректно, как указывал anonnn.
  3. Использование select.poll () удобно, но не работает в Windows в соответствии с документами Python.
  4. Использование сторонних библиотек кажется излишним для этой задачи и добавляет дополнительные зависимости.
7 голосов
/ 06 июля 2012

Отказ от ответственности: это работает только для торнадо

Вы можете сделать это, установив fd как неблокирующее, а затем использовать ioloop для регистрации обратных вызовов. Я упаковал это в яйцо с именем tornado_subprocess , и вы можете установить его через PyPI:

easy_install tornado_subprocess

теперь вы можете сделать что-то вроде этого:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

вы также можете использовать его с RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
5 голосов
/ 24 мая 2016

Эта версия неблокирующего чтения не требует специальных модулей и будет работать "из коробки" на большинстве дистрибутивов Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...