Огненная сопрограмма от instide a for loop - PullRequest
1 голос
/ 15 октября 2019

Я пытаюсь запустить сопрограмму из цикла. Вот простой пример того, чего я пытаюсь достичь:

import time
import random
import asyncio


def listen():
    while True:
       yield random.random()
       time.sleep(3)


async def dosomething(data: float):
    print("Working on data", data)
    asyncio.sleep(2)
    print("Processed data!")


async def main():
    for pos in listen():
        asyncio.create_task(dosomething(pos))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

К сожалению, это не работает, и моя dosomething сопрограмма никогда не выполняется ... что я делаю неправильно?

Ответы [ 2 ]

1 голос
/ 15 октября 2019

asyncio.create_task Функция нацелена на расписание Выполнение задачи, следует дождаться ее завершения.

Более того, asyncio.sleep(2) в вашемкод также должен ожидаться, иначе он выдаст ошибку / предупреждение.

Правильный путь:

import time
import random
import asyncio


def listen():
    while True:
       yield random.random()
       time.sleep(3)


async def dosomething(data: float):
    print("Working on data", data)
    await asyncio.sleep(2)
    print("Processed data!")


async def main():
    for pos in listen():
        await asyncio.create_task(dosomething(pos))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Пример вывода:

Working on data 0.9645515392725723
Processed data!
Working on data 0.9249656672476657
Processed data!
Working on data 0.13635467058997397
Processed data!
Working on data 0.03941252405458562
Processed data!
Working on data 0.6299882183389822
Processed data!
Working on data 0.9143748948769984
Processed data!
...
0 голосов
/ 16 октября 2019

Я хотел бы отметить, что после игры я использовал продюсера, потребительскую архитектуру для достижения того, чего я хотел. Я ценю то, что в исходном вопросе я не четко изложил свой пример использования. Но вот упрощенный фрагмент того, что я в итоге реализовал:

import asyncio
import random
from datetime import datetime
from pydantic import BaseModel


class Measurement(BaseModel):
    data: float
    time: datetime


async def measure(queue: asyncio.Queue):
    while True:
        # Replicate blocking call to recieve data
        await asyncio.sleep(1)
        print("Measurement complete!")
        for i in range(3):
            data = Measurement(
                data=random.random(),
                time=datetime.utcnow()
            )
            await queue.put(data)

    await queue.put(None)


async def process(queue: asyncio.Queue):
    while True:
        data = await queue.get()
        print(f"Got measurement! {data}")
        # Replicate pause for http request
        await asyncio.sleep(0.3)
        print("Sent data to server")


loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
meansurement = measure(queue)
processor = process(queue)
loop.run_until_complete(asyncio.gather(processor, meansurement))
loop.close()

Я должен отметить здесь (что-то, что я не совсем понял), что обязательно, чтобы любые блокирующие вызовы, которые вы делаете, могли быть await -ed. В противном случае вы можете обнаружить, что потребитель никогда не выполнит.

...