Я хотел бы отметить, что после игры я использовал продюсера, потребительскую архитектуру для достижения того, чего я хотел. Я ценю то, что в исходном вопросе я не четко изложил свой пример использования. Но вот упрощенный фрагмент того, что я в итоге реализовал:
import asyncio
import random
from datetime import datetime
from pydantic import BaseModel
class Measurement(BaseModel):
data: float
time: datetime
async def measure(queue: asyncio.Queue):
while True:
# Replicate blocking call to recieve data
await asyncio.sleep(1)
print("Measurement complete!")
for i in range(3):
data = Measurement(
data=random.random(),
time=datetime.utcnow()
)
await queue.put(data)
await queue.put(None)
async def process(queue: asyncio.Queue):
while True:
data = await queue.get()
print(f"Got measurement! {data}")
# Replicate pause for http request
await asyncio.sleep(0.3)
print("Sent data to server")
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
meansurement = measure(queue)
processor = process(queue)
loop.run_until_complete(asyncio.gather(processor, meansurement))
loop.close()
Я должен отметить здесь (что-то, что я не совсем понял), что обязательно, чтобы любые блокирующие вызовы, которые вы делаете, могли быть await
-ed. В противном случае вы можете обнаружить, что потребитель никогда не выполнит.