У меня есть веб-приложение. Это приложение имеет конечную точку для передачи некоторых данных объекта на канал redis
.
А другая конечная точка обрабатывает websocket
соединение, где эти данные выбираются из канала и отправляются клиенту через ws
.
Когда я подключаюсь через ws, сообщения получают только первый подключенный клиент.
Как читать сообщения с redis
канала с несколькими клиентами и не создавать новую подписку?
Обработчик Websocket.
Здесь я подписываюсь на канал, сохраняю его в приложении (init_tram_channel
). Затем запустите задание, где я слушаю канал и отправляю сообщения (run_tram_listening
).
@routes.get('/tram-state-ws/{tram_id}')
async def tram_ws(request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
tram_id = int(request.match_info['tram_id'])
channel_name = f'tram_{tram_id}'
await init_tram_channel(channel_name, request.app)
tram_job = await run_tram_listening(
request=request,
ws=ws,
channel=request.app['tram_producers'][channel_name]
)
request.app['websockets'].add(ws)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
break
if msg.type == aiohttp.WSMsgType.ERROR:
logging.error(f'ws connection was closed with exception {ws.exception()}')
else:
await asyncio.sleep(0.005)
except asyncio.CancelledError:
pass
finally:
await tram_job.close()
request.app['websockets'].discard(ws)
return ws
Подписка и сохранение канала.
Каждый канал связан с уникальным объектом, и чтобы не создавать много каналов, связанных с одним и тем же объектом, я сохраняю только один в приложении.
app['tram_producers']
является диктом.
async def init_tram_channel(
channel_name: str,
app: web.Application
):
if channel_name not in app['tram_producers']:
channel, = await app['redis'].subscribe(channel_name)
app['tram_producers'][channel_name] = channel
Запуск Coro для прослушивания каналов.
Я запускаю его через aiojobs:
async def run_tram_listening(
request: web.Request,
ws: web.WebSocketResponse,
channel: Channel
):
"""
:return: aiojobs._job.Job object
"""
listen_redis_job = await spawn(
request,
_read_tram_subscription(
ws,
channel
)
)
return listen_redis_job
Коро, где я слушаю и отправляю сообщения:
async def _read_tram_subscription(
ws: web.WebSocketResponse,
channel: Channel
):
try:
async for msg in channel.iter():
tram_data = msg.decode()
await ws.send_json(tram_data)
except asyncio.CancelledError:
pass
except Exception as e:
logging.error(msg=e, exc_info=e)