Чего я хочу достичь
- Я хочу построчно выполнять потоковую передачу элементов, похожих на генератор, из внешней программы из Python ,
- Сломано, я хочу что-то вроде
Generator -> Popen(...) -> Generator
, не храня слишком много данных в памяти.
Вот рабочий, минимальный пример, который демонстрирует, чего я хочу достичь:
from io import StringIO
from subprocess import Popen, PIPE
import time
proc_input = StringIO("aa\nbb\ncc\ndd")
proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
for line in proc_input:
proc.stdin.write(line.encode())
yield proc.stdout.readline()
time.sleep(1)
Проблема : proc.stdout.readline()
просто блокирует и ничего не показывает.
Что я уже выучил :
- Если входные данные поступают из файлового объекта (то есть с чем-то реализованным
fileno()
), я могу передать это напрямую в stdin и избежать записи в PIPE. Но для этого мне нужно сначала направить генератор в файл, который я хотел бы избежать, так как это кажется ненужным обходным путем. Например, работает следующее:
import tempfile
from subprocess import Popen, PIPE
tp = tempfile.TemporaryFile()
tp.write("aa\nbb\ncc\ndd".encode())
tp.seek(0)
proc = Popen(["cat"], stdin=tp, stdout=PIPE)
for line in proc.stdout:
print(line)
- Если я продолжу запись в объект PIPE, я могу решить проблему, закрыв входной поток, а затем прочитав из выходного потока. Но здесь я не знаю, где в настоящее время живут данные. Потому что мой генератор выдает ГБ данных, я не хочу работать с ошибками памяти.
proc_input = StringIO("aa\nbb\ncc\ndd")
proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
for line in proc_input:
proc.stdin.write(line.encode())
proc.stdin.close()
for line in proc.stdout:
print(line)
То, что я тоже пробовал :
- Я поиграл с аргументом размера буфера
Popen(..., bufsize=)
, но, похоже, он не имел никакого эффекта. - Я попытался записать входные данные в
io.BufferedWriter
с надеждой, что Попен сможет переварить это как вход для STDIN. Также безуспешно.
Дополнительная информация : Я использую Linux.
Примечания к комментариям
Было предложено разбить входной генератор на куски. Это может быть достигнуто через
def PopenStreaming(process, popen_kwargs, nlines, input):
while input:
proc = Popen(process, stdin=PIPE, stdout=PIPE, **popen_kwargs)
for n, row in enumerate(input):
proc.stdin.write(row)
if n == nlines:
proc.stdin.close()
break
for row in proc.stdout:
yield row