Поток данных в памяти через Python Подпроцесс 'Popen over external command - PullRequest
1 голос
/ 06 марта 2020

Чего я хочу достичь

  • Я хочу построчно выполнять потоковую передачу элементов, похожих на генератор, из внешней программы из Python ,
  • Сломано, я хочу что-то вроде Generator -> Popen(...) -> Generator, не храня слишком много данных в памяти.

Вот рабочий, минимальный пример, который демонстрирует, чего я хочу достичь:


    from io import StringIO
    from subprocess import Popen, PIPE
    import time

    proc_input = StringIO("aa\nbb\ncc\ndd")
    proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
    for line in  proc_input:
        proc.stdin.write(line.encode())
        yield proc.stdout.readline()
        time.sleep(1)

Проблема : proc.stdout.readline() просто блокирует и ничего не показывает.

Что я уже выучил :

  • Если входные данные поступают из файлового объекта (то есть с чем-то реализованным fileno()), я могу передать это напрямую в stdin и избежать записи в PIPE. Но для этого мне нужно сначала направить генератор в файл, который я хотел бы избежать, так как это кажется ненужным обходным путем. Например, работает следующее:

    import tempfile
    from subprocess import Popen, PIPE

    tp = tempfile.TemporaryFile()
    tp.write("aa\nbb\ncc\ndd".encode())
    tp.seek(0)
    proc = Popen(["cat"], stdin=tp, stdout=PIPE)
    for line in proc.stdout:
        print(line)

  • Если я продолжу запись в объект PIPE, я могу решить проблему, закрыв входной поток, а затем прочитав из выходного потока. Но здесь я не знаю, где в настоящее время живут данные. Потому что мой генератор выдает ГБ данных, я не хочу работать с ошибками памяти.

    proc_input = StringIO("aa\nbb\ncc\ndd")
    proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
    for line in  proc_input:
        proc.stdin.write(line.encode())
    proc.stdin.close()

    for line in proc.stdout:
            print(line)

То, что я тоже пробовал :

  • Я поиграл с аргументом размера буфера Popen(..., bufsize=), но, похоже, он не имел никакого эффекта.
  • Я попытался записать входные данные в io.BufferedWriter с надеждой, что Попен сможет переварить это как вход для STDIN. Также безуспешно.

Дополнительная информация : Я использую Linux.

Примечания к комментариям

Было предложено разбить входной генератор на куски. Это может быть достигнуто через

   def PopenStreaming(process, popen_kwargs, nlines, input):
        while input:
            proc = Popen(process, stdin=PIPE, stdout=PIPE, **popen_kwargs)
            for n, row in enumerate(input):
                proc.stdin.write(row)
                if n == nlines:
                    proc.stdin.close()
                    break
            for row in proc.stdout:
                yield row

1 Ответ

1 голос
/ 06 марта 2020

Я не уверен, что всегда можно сделать то, что ты пытаешься сделать. Документы на https://docs.python.org/3/library/subprocess.html скажем

Предупреждение: используйте communicate() вместо .stdin.write, .stdout.read или .stderr.read, чтобы избежать взаимоблокировок из-за любых других Каналы конвейера ОС заполняют и блокируют дочерний процесс.

Итак, вы должны использовать communicate, но это означает ожидание завершения процесса:

Popen.communicate(input=None, timeout=None) Взаимодействовать с процессом: отправлять данные в stdin. Читайте данные из stdout и stderr, пока не будет достигнут конец файла. Дождитесь завершения процесса.

Это означает, что вы сможете использовать communicate только один раз, а это не то, что вам нужно.

Однако я думаю использование текстового режима с линейной буферизацией должно быть безопасным, чтобы избежать тупиковой блокировки:

from subprocess import Popen, PIPE

kwargs = {
    "stdin": PIPE,
    "stdout": PIPE,
    "universal_newlines": True,  # text mode
    "bufsize": 1,  # line buffered
}

with Popen(["cat"], **kwargs) as process:
    for data in ["A\n", "B\n", "C\n"]:
        process.stdin.write(data)
        print("data sent:", data)
        output = process.stdout.readline()
        print("output received:", output)

Если это не применимо в вашем случае, возможно, вы можете разделить свой вызов на несколько меньших вызовов? Использование check_output с аргументом ключевого слова input также может упростить ваш код:

from subprocess import check_output
output = check_output(["cat"], input=b"something\n")
print(output)
...