outReceived из витого ProcessProtocol объединяет сообщения, если они получены слишком быстро (проблема с буферизацией?) - PullRequest
0 голосов
/ 19 сентября 2019

Я использую Klein, микро-фреймворк на основе Twisted.У меня есть сервер (работающий на Windows!), Который будет порождать внешний длительный процесс (сквозной тест) с помощьюctor.spawnProcess ().Чтобы отправить информацию о состоянии работающего теста, я реализовал ProcessProtocol:

class IPCProtocol(protocol.ProcessProtocol):
    def __init__(self, status: 'Status', history: 'History'):
        super().__init__()
        self.status: Status = status
        self.history: History = history
        self.pid = None

    def connectionMade(self):
        self.pid = self.transport.pid
        log.msg("process started, pid={}".format(self.pid))

    def processExited(self, reason):
        log.msg("process exited, status={}".format(reason.value.exitCode))
        # add current run to history
        self.history.add(self.status.current_run)
        # create empty testrun and save status
        self.status.current_run = Testrun()
        self.status.status = StatusEnum.ready
        self.status.save()
        # check for more queue items
        if not self.status.queue.is_empty():
            start_testrun()

    def outReceived(self, data: bytes):
        data = data.decode('utf-8').strip()
        if data.startswith(constants.LOG_PREFIX_FAILURE):
            self.failureReceived()
        if data.startswith(constants.LOG_PREFIX_SERVER):
            data = data[len(constants.LOG_PREFIX_SERVER):]
            log.msg("Testrunner: " + data)
            self.serverMsgReceived(data)

Я запускаю процесс с помощью следующей команды:

ipc_protocol = IPCProtocol(status=app.status, history=app.history)
args = [sys.executable, 'testrunner.py', next_entry.suite, json.dumps(next_entry.testscripts)]
log.msg("Starting testrunn.py with args: {}".format(args))
reactor.spawnProcess(ipc_protocol, sys.executable, args=args)

Чтобы отправить информацию, я просто распечатаю сообщения(с префиксом, чтобы различать их) в моем testrunner.py.

Проблема в том, что если я отправлю команды печати на быстрый, то outReceived объединит сообщения.

Я уже пытался добавить flush=True для print() вызовов во внешнем процессе, но это не решило проблему.Другой вопрос предложил использовать usePTY=True для spawnProcess, но это не поддерживается в Windows.Есть ли лучший способ исправить это, чем добавить небольшую задержку (например, time.sleep(0.1)) к каждому вызову print()?

1 Ответ

1 голос
/ 19 сентября 2019

Вы не сказали этого, но похоже, что дочерний процесс записывает строки в свой стандартный вывод.Вам нужно проанализировать выходные данные, чтобы найти границы линий, если вы хотите работать с этими линиями.

Вы можете использовать LineOnlyReceiver, чтобы помочь вам в этом.Поскольку процессы не являются потоковыми транспортами, вы не можете просто использовать LineOnlyReceiver напрямую.Вы должны адаптировать его к интерфейсу протокола процесса.Вы можете сделать это самостоятельно или использовать ProcessEndpoint (вместо spawnProcess), чтобы сделать это за вас.

Например:

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import ProcessEndpoint
from twisted.internet import reactor

endpoint = ProcessEndpoint(reactor, b"/some/some-executable", ...)
spawning_deferred = endpoint.connect(Factory.forProtocol(LineOnlyReceiver))
...
...