Чтение потокового вывода из подпроцесса asyn c - PullRequest
1 голос
/ 18 января 2020

Я пытаюсь прочитать URL-адреса из программы, запущенной в подпроцессе, а затем запланировать асинхронный HTTP-запрос, но похоже, что запросы выполняются синхронно. Это потому, что подпроцесс и запросы выполняются в одной и той же функции сопрограммы?

test.py

import random
import time

URLS = ['http://example.com', 'http://example.com/sleep5s']

def main():
    for url in random.choices(URLS, weights=(1, 1), k=5):
        print(url)
        time.sleep(random.uniform(0.5, 1))


if __name__ == '__main__':
    main()

main.py

import asyncio
import sys

import httpx

from  httpx.exceptions import TimeoutException


async def req(url):
    async with httpx.AsyncClient() as client:
        try:
            r = await client.get(url, timeout=2)
            print(f'Response {url}: {r.status_code}')
        except Exception as TimeoutException:
            print(f'TIMEOUT - {url}')
        except Exception as exc:
            print(f'ERROR - {url}')


async def run():
    proc = await asyncio.create_subprocess_exec(
        sys.executable,
        '-u',
        'test.py',
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    while True:
        line = await proc.stdout.readline()
        if not line:
            break

        url = line.decode().rstrip()
        print(f'Found URL: {url}')

        resp = await req(url)

    await proc.wait()


async def main():
    await run()


if __name__ == '__main__':
    asyncio.run(main())

Test

$ python main.py
Found URL: http://example.com
Response http://example.com: 200
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
Found URL: http://example.com
Response http://example.com: 200
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s

1 Ответ

1 голос
/ 19 января 2020

похоже, что запросы выполняются синхронно. Это потому, что подпроцесс и запросы выполняются в одной и той же функции сопрограммы?

Ваш диагноз правильный. await означает то, что написано на банке: сопрограмма не будет действовать, пока не будет получен результат. К счастью, asyncio облегчает запуск сопрограммы в фоновом режиме:

    tasks = []
    while True:
        line = await proc.stdout.readline()
        if not line:
            break

        url = line.decode().rstrip()
        print(f'Found URL: {url}')

        tasks.append(asyncio.create_task(req(url)))

    resps = asyncio.gather(*tasks)
    await proc.wait()

Обратите внимание, что:

  • asyncio.create_task() гарантирует, что запросы начнут обрабатываться, даже когда мы все еще читаем строки
  • asyncio.gather() гарантируют, что все задачи фактически ожидаются до завершения сопрограммы. Он также обеспечивает доступ к ответам и распространяет исключения, если таковые имеются.
...