python: чтение вывода подпроцесса в потоках - PullRequest
9 голосов
/ 21 марта 2012

У меня есть исполняемый файл, который я вызываю с помощью subprocess.Popen.Затем я намереваюсь передать ему некоторые данные через стандартный ввод, используя поток, который считывает его значение из очереди, которая позже будет заполнена в другом потоке.Вывод должен быть прочитан с использованием канала stdout в другом потоке и снова отсортирован в очереди.

Насколько я понимаю из моего предыдущего исследования, использование потоков с очередью является хорошей практикой.

К сожалению, внешний исполняемый файл не даст быстрого ответа на каждую передаваемую строку, поэтому простые циклы записи и чтения не подходят.Исполняемый файл реализует некоторую внутреннюю многопоточность, и я хочу вывод, как только он станет доступен, поэтому дополнительный поток чтения.

В качестве примера для тестирования исполняемого файла будет просто перетасовывать каждую строку (shuffleline.py):

#!/usr/bin/python -u
import sys
from random import shuffle

for line in sys.stdin:
    line = line.strip()

    # shuffle line
    line = list(line)
    shuffle(line)
    line = "".join(line)

    sys.stdout.write("%s\n"%(line))
    sys.stdout.flush() # avoid buffers

Обратите внимание, что это уже настолько небуферизовано, насколько это возможно.Или нет?Это моя урезанная тестовая программа:

#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess

class WriteThread(threading.Thread):
    def __init__(self, p_in, source_queue):
        threading.Thread.__init__(self)
        self.pipe = p_in
        self.source_queue = source_queue

    def run(self):
        while True:
            source = self.source_queue.get()
            print "writing to process: ", repr(source)
            self.pipe.write(source)
            self.pipe.flush()
            self.source_queue.task_done()

class ReadThread(threading.Thread):
    def __init__(self, p_out, target_queue):
        threading.Thread.__init__(self)
        self.pipe = p_out
        self.target_queue = target_queue

    def run(self):
        while True:
            line = self.pipe.readline() # blocking read
            if line == '':
                break
            print "reader read: ", line.rstrip()
            self.target_queue.put(line)

if __name__ == "__main__":

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    source_queue = Queue.Queue()
    target_queue = Queue.Queue()

    writer = WriteThread(proc.stdin, source_queue)
    writer.setDaemon(True)
    writer.start()

    reader = ReadThread(proc.stdout, target_queue)
    reader.setDaemon(True)
    reader.start()

    # populate queue
    for i in range(10):
        source_queue.put("string %s\n" %i)
    source_queue.put("")

    print "source_queue empty: ", source_queue.empty()
    print "target_queue empty: ", target_queue.empty()

    import time
    time.sleep(2) # expect some output from reader thread

    source_queue.join() # wait until all items in source_queue are processed
    proc.stdin.close()  # should end the subprocess
    proc.wait()

это дает следующий вывод (python2.7):

writing to process:  'string 0\n'
writing to process:  'string 1\n'
writing to process:  'string 2\n'
writing to process:  'string 3\n'
writing to process:  'string 4\n'
writing to process:  'string 5\n'
writing to process:  'string 6\n'
source_queue empty: writing to process:  'string 7\n'
writing to process:  'string 8\n'
writing to process:  'string 9\n'
writing to process:  ''
 True
target_queue empty:  True

, затем ничего в течение 2 секунд ...

reader read:  rgsn0i t
reader read:  nrg1sti
reader read:  tis n2rg
reader read:  snt gri3
reader read:  nsri4 tg
reader read:  stir5 gn
reader read:   gnri6ts
reader read:   ngrits7
reader read:  8nsrt ig
reader read:  sg9 nitr

Ожидается чередование в начале.Однако вывод подпроцесса не появляется до после , когда подпроцесс завершается.С большим количеством строк я получаю некоторый вывод, поэтому я предполагаю проблему кеширования в канале stdout.Согласно другим вопросам, размещенным здесь, очистка stdout (в подпроцессе) должна работать, по крайней мере, в Linux.

1 Ответ

8 голосов
/ 22 марта 2012

Ваша проблема не имеет ничего общего с модулем или потоками subprocess (какими бы проблематичными они ни были) или даже смешиванием подпроцессов и потоков (очень плохая идея * , даже хуже, чем использование потоков для начала , если вы не используете бэкпорт модуля подпроцесса Python 3.2, который вы можете получить из code.google.com / p / python-subprocess32 ), или получаете доступ к одним и тем же вещам из нескольких потоков (как ваш print заявления делают.)

Что происходит, так это то, что ваша shuffleline.py программа буферизуется. Не на выходе, а на входе. Хотя это не очень очевидно, когда вы перебираете файловый объект, Python будет читать в блоках, обычно 8 Кбайт. Так как sys.stdin является файловым объектом, ваш цикл for будет буферизован до EOF или полного блока:

for line in sys.stdin:
    line = line.strip()
    ....

Если вы не хотите этого делать, используйте цикл while для вызова sys.stdin.readline() (который возвращает '' для EOF):

while True:
    line = sys.stdin.readline()
    if not line:
        break
    line = line.strip()
    ...

или используйте форму с двумя аргументами iter(), которая создает итератор, который вызывает первый аргумент до тех пор, пока не будет возвращен второй аргумент ("страж"):

for line in iter(sys.stdin.readline, ''):
    line = line.strip()
    ...

Я также был бы упущен, если бы не предлагал не использовать потоки для этого, а вместо этого вместо этого блокировать ввод-вывод в каналах подпроцесса или даже что-то вроде twisted.reactor.spawnProcess, в котором есть множество способов перехвата процессов и других вещей. вместе как потребители и производители.

...