Я искал пример кода для поэтапной итерации по выходным данным процесса, поскольку этот процесс потребляет свои данные от предоставленного итератора (также и по инкрементному).В основном:
import string
import random
# That's what I consider a very useful function, though didn't
# find any existing implementations.
def process_line_reader(args, stdin_lines):
# args - command to run, same as subprocess.Popen
# stdin_lines - iterable with lines to send to process stdin
# returns - iterable with lines received from process stdout
pass
# Returns iterable over n random strings. n is assumed to be infinity if negative.
# Just an example of function that returns potentially unlimited number of lines.
def random_lines(n, M=8):
while 0 != n:
yield "".join(random.choice(string.letters) for _ in range(M))
if 0 < n:
n -= 1
# That's what I consider to be a very convenient use case for
# function proposed above.
def print_many_uniq_numbered_random_lines():
i = 0
for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)):
# Key idea here is that `process_line_reader` will feed random lines into
# `uniq` process stdin as lines are consumed from returned iterable.
print "#%i: %s" % (i, line)
i += 1
Некоторые из предложенных здесь решений позволяют делать это с потоками (но это не всегда удобно) или с asyncio (который недоступен в Python 2.x).Ниже приведен пример работающей реализации, позволяющей это сделать.
import subprocess
import os
import fcntl
import select
class nonblocking_io(object):
def __init__(self, f):
self._fd = -1
if type(f) is int:
self._fd = os.dup(f)
os.close(f)
elif type(f) is file:
self._fd = os.dup(f.fileno())
f.close()
else:
raise TypeError("Only accept file objects or interger file descriptors")
flag = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
return False
def fileno(self):
return self._fd
def close(self):
if 0 <= self._fd:
os.close(self._fd)
self._fd = -1
class nonblocking_line_writer(nonblocking_io):
def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep):
super(nonblocking_line_writer, self).__init__(f)
self._lines = iter(lines)
self._lines_ended = False
self._autoclose = autoclose
self._buffer_size = buffer_size
self._buffer_offset = 0
self._buffer = bytearray()
self._encoding = encoding
self._linesep = bytearray(linesep, encoding)
# Returns False when `lines` iterable is exhausted and all pending data is written
def continue_writing(self):
while True:
if self._buffer_offset < len(self._buffer):
n = os.write(self._fd, self._buffer[self._buffer_offset:])
self._buffer_offset += n
if self._buffer_offset < len(self._buffer):
return True
if self._lines_ended:
if self._autoclose:
self.close()
return False
self._buffer[:] = []
self._buffer_offset = 0
while len(self._buffer) < self._buffer_size:
line = next(self._lines, None)
if line is None:
self._lines_ended = True
break
self._buffer.extend(bytearray(line, self._encoding))
self._buffer.extend(self._linesep)
class nonblocking_line_reader(nonblocking_io):
def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"):
super(nonblocking_line_reader, self).__init__(f)
self._autoclose = autoclose
self._buffer_size = buffer_size
self._encoding = encoding
self._file_ended = False
self._line_part = ""
# Returns (lines, more) tuple, where lines is iterable with lines read and more will
# be set to False after EOF.
def continue_reading(self):
lines = []
while not self._file_ended:
data = os.read(self._fd, self._buffer_size)
if 0 == len(data):
self._file_ended = True
if self._autoclose:
self.close()
if 0 < len(self._line_part):
lines.append(self._line_part.decode(self._encoding))
self._line_part = ""
break
for line in data.splitlines(True):
self._line_part += line
if self._line_part.endswith(("\n", "\r")):
lines.append(self._line_part.decode(self._encoding).rstrip("\n\r"))
self._line_part = ""
if len(data) < self._buffer_size:
break
return (lines, not self._file_ended)
class process_line_reader(object):
def __init__(self, args, stdin_lines):
self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self._reader = nonblocking_line_reader(self._p.stdout)
self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines)
self._iterator = self._communicate()
def __iter__(self):
return self._iterator
def __enter__(self):
return self._iterator
def __exit__(self, type, value, traceback):
self.close()
return False
def _communicate(self):
read_set = [self._reader]
write_set = [self._writer]
while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [])
except select.error, e:
if e.args[0] == errno.EINTR:
continue
raise
if self._reader in rlist:
stdout_lines, more = self._reader.continue_reading()
for line in stdout_lines:
yield line
if not more:
read_set.remove(self._reader)
if self._writer in wlist:
if not self._writer.continue_writing():
write_set.remove(self._writer)
self.close()
def lines(self):
return self._iterator
def close(self):
if self._iterator is not None:
self._reader.close()
self._writer.close()
self._p.wait()
self._iterator = None