Могу ли я писать и читать из процесса в том же asyncio l oop? - PullRequest
0 голосов
/ 26 марта 2020

У меня есть процесс, который читает входные данные и записывает выходные данные, например: doubler :
(на самом деле это черный ящик, а вход и выход полностью независимы )

#!/bin/bash
while read -r i; do
    sleep 0.$RANDOM
    echo $((i*2))
done

и несколько функций в моем коде Python, который асинхронно передает этот процесс:

import asyncio
import subprocess
import random

class Feeder:
    def __init__(self):
        self.process = subprocess.Popen(['doubler.sh'],
                                        stdin=subprocess.PIPE)

    def feed(self, value):
        self.process.stdin.write(str(value).encode() + b'\n')
        self.process.stdin.flush()

feeder = Feeder()

async def feed_random():
    while True:
        feeder.feed(random.randint(0, 100))
        await asyncio.sleep(1)

async def feed_tens():
    while True:
        feeder.feed(10)
        await asyncio.sleep(3.14)

async def main():
    await asyncio.gather(
        feed_random(),
        feed_tens(),
    )

if __name__ == '__main__':
    asyncio.run(main())

Это работает хорошо. Но я также хотел бы прочитать результаты процесса, например:

...
stdout=subprocess.PIPE
...
for line in feeder.process.stdout:
    print("The answer is " + line.decode())

, но это блокирует, поэтому кормление не произойдет. Можно ли это сделать в том же asyncio l oop? Или мне нужен другой поток?

1 Ответ

1 голос
/ 27 марта 2020

Нечто подобное должно работать. Чтобы читать из stdout асинхронно, вам нужно переключиться на asyncio.subprocess.

import asyncio
import random

class Feeder:
    def __init__(self):
        self.process = None

    async def start_process(self):
        self.process = await asyncio.create_subprocess_exec('./doubler.sh',
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE)

    async def feed(self, value):
        self.process.stdin.write(str(value).encode() + b'\n')
        await self.process.stdin.drain()

feeder = Feeder()

async def feed_random():
    while True:
        asyncio.ensure_future(feeder.feed(random.randint(0, 100)))
        await asyncio.sleep(1)

async def feed_tens():
    while True:
        asyncio.ensure_future(feeder.feed(10))
        await asyncio.sleep(3.14)

async def read_feed():
    while True:
        line = await feeder.process.stdout.readline()
        print("The answer is " + line.decode('utf-8'))


async def main():
    await feeder.start_process()
    await asyncio.gather(
        feed_random(),
        feed_tens(),
        read_feed()
    )

if __name__ == '__main__':
    asyncio.run(main())
...