Как использовать asyncio для чтения из подпроцесса с использованием SubprocessProtocol и завершить этот подпроцесс в произвольной точке? - PullRequest
2 голосов
/ 10 июня 2019

Используя ответы здесь в качестве основы (в которой используется SubprocessProtocol), я просто пытаюсь прочитать из подпроцесса и прекратить чтение (и завершить подпроцесс) в точке моего выбора ( например, я прочитал достаточно данных).

Обратите внимание, что я хочу получить выгоду от использования run_until_complete для другого обсуждения .

Я использую Windows, и в приведенном ниже примере используется cat от Cygwin. Фактическая утилита, которую я использую - это просто консольное приложение Windows, но оно будет транслироваться до тех пор, пока оно не закроется вручную.

Я могу читать данные очень хорошо, но мои попытки остановить чтение и закрыть подпроцесс (например, вызвать loop.stop () из pipe_data_received()) приводят к исключениям (RuntimeError: Event loop is closed и ValueError: I/O operation on closed pipe). Я хотел бы немедленно завершить подпроцесс изящно .

Я не думаю, что это столько платформа, сколько я не вижу, где правильно прервать вещи, чтобы получить желаемый эффект. Любые идеи о том, как это сделать?

Мой код Python 3.7+ (измененный из примера):

import asyncio
import os

external_program = "cat"  # Something that will output to stdio
external_option = "a"  # An arbitrarily large amount of data
saved_data = []

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            data_len = len(data)
            print(''.join(' {:02x}'.format(x) for x in data), flush=True)
            saved_data.extend(data)
            if len(saved_data) > 512:  # Stop once we've read this much data
                loop.call_soon_threadsafe(loop.stop)
    def connection_lost(self, exc):
        print("Connection lost")
        loop.stop() # end loop.run_forever()

print("START")

if os.name == 'nt':
    # On Windows, the ProactorEventLoop is necessary to listen on pipes
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

try:
    loop.run_until_complete(
        loop.subprocess_exec(
            SubprocessProtocol, 
            external_program,
            external_option,
        )
    )
    loop.run_forever()
finally:
    loop.close()

print("DONE")
loop.close()

1 Ответ

1 голос
/ 10 июня 2019

Не эксперт asyncio, но что-то вроде этого должно работать.

import time
import asyncio
import threading

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def __init__(self, loop):
        self.transport = None
        self.loop = loop

    def pipe_data_received(self, fd, data):
        print('data received')

    def connection_lost(self, exc):
        print("Connection lost")

    def connection_made(self, transport):
        print("Connection made")

        self.transport = transport

        # becasue calc won't call pipe_data_received method.
        t = threading.Thread(target=self._endme)
        t.setDaemon(True)
        t.start()

    def _endme(self):
        time.sleep(2)
        # You'd normally use these inside pipe_data_received, connection_lost methods
        self.transport.close()
        self.loop.stop()


def main():
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(loop.subprocess_exec(
            lambda: SubprocessProtocol(loop), 
            'calc.exe'
        ))

    loop.run_forever()
    loop.close()

if __name__ == "__main__":
    main()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...