Я использую aiohttp lib для работы с WebSockets. Также используйте aio_pika для подписки на каналы кролика.
Когда я получил новое подключение пользователя, я создаю новую задачу, которая:
1. создать новый канал
2. подписаться на новую очередь.
Вот код:
Обработчик WebSockets:
async def index(request):
ws_current = web.WebSocketResponse()
ws_ready = ws_current.can_prepare(request)
queue_name = get_unique_queue_name()
request.app['users'][queue_name] = ws_current
await ws_current.prepare(request)
#create new task
task = asyncio.create_task(create_queue(queue_name, request.app))
# other code...
Вот функция create_task:
async def create_queue(queue_name, app):
connection = app.rabbit # connection we made in Application.cleanup_ctx()
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(
queue_name,
auto_delete=True
)
async for message in queue:
with message.process():
ws = app['users'][queue_name]
import json
# send to client
await ws.send_json(json.loads(message.body))
# have to close channel
await channel.close()
Есть ли правильный способ закрыть задачу «create_queue»? С близкими каналами конечно.
Переместите функцию Declare_queue в функцию «index», а в «Create_queue» - просто прослушайте канал, я думаю, что это не очень хорошая идея, потому что в этом случае теряется отдельно между двумя задачами.