Кажется, есть два способа сделать pub / sub:
- использовать channel.wait_message () и channel.get ()
- использовать Receiver.iter ()
В документе говорится, что wait_message () блокируется при ожидании новых сообщений.Я провел некоторый тест с измененным примером aioredis, и они, кажется, работают и не блокируются.Они эквивалентны?Нужно ли добавить небольшой сон в ридере?Если нет, то как aioredis управляет частотой опроса?
import asyncio
import aioredis
import random
from itertools import count
async def reader(ch):
# while await ch.wait_message():
# msg = await ch.get_json()
async for msg in ch.iter(encoding='utf-8'):
print("Got Message:", msg)
**# await asyncio.sleep(.1) # is this necessary?**
async def sender(pub):
while True:
pub.publish_json('chan:1', ["Hello", "world"])
await asyncio.sleep(random.randint(1, 5))
async def foo():
while True:
print('bar')
await asyncio.sleep(random.random())
async def main():
pub = await aioredis.create_redis(
'redis://localhost')
sub = await aioredis.create_redis(
'redis://localhost')
ch1, = await sub.subscribe('chan:1')
tsk = asyncio.ensure_future(reader(ch1))
tsk1 = asyncio.ensure_future(foo())
tsk2 = asyncio.ensure_future(sender(pub))
await foo()
await sub.unsubscribe('chan:1')
await tsk
tsk1.cancel()
await tsk1
tsk2.cancel()
await tsk2
sub.close()
pub.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
try:
pass
except KeyboardInterrupt:
for task in asyncio.all_tasks():
task.cancel()