У меня есть исполняемый файл, который я вызываю с помощью subprocess.Popen.Затем я намереваюсь передать ему некоторые данные через стандартный ввод, используя поток, который считывает его значение из очереди, которая позже будет заполнена в другом потоке.Вывод должен быть прочитан с использованием канала stdout в другом потоке и снова отсортирован в очереди.
Насколько я понимаю из моего предыдущего исследования, использование потоков с очередью является хорошей практикой.
К сожалению, внешний исполняемый файл не даст быстрого ответа на каждую передаваемую строку, поэтому простые циклы записи и чтения не подходят.Исполняемый файл реализует некоторую внутреннюю многопоточность, и я хочу вывод, как только он станет доступен, поэтому дополнительный поток чтения.
В качестве примера для тестирования исполняемого файла будет просто перетасовывать каждую строку (shuffleline.py):
#!/usr/bin/python -u
import sys
from random import shuffle
for line in sys.stdin:
line = line.strip()
# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)
sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers
Обратите внимание, что это уже настолько небуферизовано, насколько это возможно.Или нет?Это моя урезанная тестовая программа:
#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess
class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue
def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()
class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue
def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)
if __name__ == "__main__":
cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
source_queue = Queue.Queue()
target_queue = Queue.Queue()
writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()
reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()
# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")
print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()
import time
time.sleep(2) # expect some output from reader thread
source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()
это дает следующий вывод (python2.7):
writing to process: 'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True
, затем ничего в течение 2 секунд ...
reader read: rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr
Ожидается чередование в начале.Однако вывод подпроцесса не появляется до после , когда подпроцесс завершается.С большим количеством строк я получаю некоторый вывод, поэтому я предполагаю проблему кеширования в канале stdout.Согласно другим вопросам, размещенным здесь, очистка stdout (в подпроцессе) должна работать, по крайней мере, в Linux.