Как реализовать мультипотребитель с одним производителем с помощью aioredis pub / sub - PullRequest
0 голосов
/ 12 января 2019

У меня есть веб-приложение. Это приложение имеет конечную точку для передачи некоторых данных объекта на канал redis.
А другая конечная точка обрабатывает websocket соединение, где эти данные выбираются из канала и отправляются клиенту через ws.

Когда я подключаюсь через ws, сообщения получают только первый подключенный клиент.

Как читать сообщения с redis канала с несколькими клиентами и не создавать новую подписку?

Обработчик Websocket.
Здесь я подписываюсь на канал, сохраняю его в приложении (init_tram_channel). Затем запустите задание, где я слушаю канал и отправляю сообщения (run_tram_listening).

@routes.get('/tram-state-ws/{tram_id}')
async def tram_ws(request: web.Request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    tram_id = int(request.match_info['tram_id'])
    channel_name = f'tram_{tram_id}'

    await init_tram_channel(channel_name, request.app)
    tram_job = await run_tram_listening(
        request=request,
        ws=ws,
        channel=request.app['tram_producers'][channel_name]
    )

    request.app['websockets'].add(ws)
    try:
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                if msg.data == 'close':
                    await ws.close()
                    break
            if msg.type == aiohttp.WSMsgType.ERROR:
                logging.error(f'ws connection was closed with exception {ws.exception()}')
            else:
                await asyncio.sleep(0.005)
    except asyncio.CancelledError:
        pass
    finally:
        await tram_job.close()
        request.app['websockets'].discard(ws)

    return ws

Подписка и сохранение канала.
Каждый канал связан с уникальным объектом, и чтобы не создавать много каналов, связанных с одним и тем же объектом, я сохраняю только один в приложении. app['tram_producers'] является диктом.

async def init_tram_channel(
        channel_name: str,
        app: web.Application
):
    if channel_name not in app['tram_producers']:
        channel, = await app['redis'].subscribe(channel_name)
        app['tram_producers'][channel_name] = channel

Запуск Coro для прослушивания каналов. Я запускаю его через aiojobs:

async def run_tram_listening(
        request: web.Request,
        ws: web.WebSocketResponse,
        channel: Channel
):
    """
    :return: aiojobs._job.Job object
    """
    listen_redis_job = await spawn(
        request,
        _read_tram_subscription(
            ws,
            channel
        )
    )
    return listen_redis_job

Коро, где я слушаю и отправляю сообщения:

async def _read_tram_subscription(
        ws: web.WebSocketResponse,
        channel: Channel
):
    try:
        async for msg in channel.iter():
            tram_data = msg.decode()
            await ws.send_json(tram_data)
    except asyncio.CancelledError:
        pass
    except Exception as e:
        logging.error(msg=e, exc_info=e)

Ответы [ 2 ]

0 голосов
/ 14 января 2019

Следующий код был найден в некоторых выпусках aioredis github (я принял его к своей задаче).

class TramProducer:
    def __init__(self, channel: aioredis.Channel):
        self._future = None
        self._channel = channel

    def __aiter__(self):
        return self

    def __anext__(self):
        return asyncio.shield(self._get_message())

    async def _get_message(self):
        if self._future:
            return await self._future

        self._future = asyncio.get_event_loop().create_future()
        message = await self._channel.get_json()
        future, self._future = self._future, None
        future.set_result(message)
        return message

Итак, как это работает? TramProducer оборачивает способ, которым мы получаем сообщения.
Как сказала @Messa

сообщение получено из одной подписки Redis только один раз.

Таким образом, только один клиент TramProducer извлекает сообщения из Redis, в то время как другие клиенты ждут будущих результатов, которые будут установлены после получения сообщения из канала.

Если инициализирована self._future, это означает, что кто-то ждет сообщения от redis, поэтому мы просто дождемся результата self._future.

Использование TramProducer (я взял пример из моего вопроса):

async def _read_tram_subscription(
        ws: web.WebSocketResponse,
        tram_producer: TramProducer
):
    try:
        async for msg in tram_producer:
            await ws.send_json(msg)
    except asyncio.CancelledError:
        pass
    except Exception as e:
        logging.error(msg=e, exc_info=e)

Инициализация TramProducer:

async def init_tram_channel(
        channel_name: str,
        app: web.Application
):
    if channel_name not in app['tram_producers']:
        channel, = await app['redis'].subscribe(channel_name)
        app['tram_producers'][channel_name] = TramProducer(channel)

Я думаю, что это может быть полезно для кого-то.
Полный проект здесь https://gitlab.com/tram-emulator/tram-server

0 голосов
/ 13 января 2019

Я полагаю, что сообщение получено из одной подписки Redis только один раз, и если в вашем приложении более одного прослушивателя, то только один из них получит его.

Таким образом, вам нужно создать что-то вроде мини-паба / подпрограммы внутри приложения, чтобы распространять сообщения среди всех слушателей (в данном случае соединения через веб-сокет).

Некоторое время назад я сделал пример чата aiohttp websocket - не с Redis, но, по крайней мере, есть кросс-сокетный дистрибутив: https://github.com/messa/aiohttp-nextjs-demo-chat/blob/master/chat_web/views/api.py

Ключ заключается в том, чтобы иметь в приложении message_subcriptions, где каждое подключение веб-сокета регистрируется само по себе, или, возможно, его собственное asyncio.Queue (я использовал Event в моем примере, но это неоптимально) и всякий раз, когда сообщение приходит от Redis, оно отправляется во все соответствующие очереди.

Конечно, когда соединение с веб-сокетом завершается (отмена подписки, отключение, сбой клиента ...), очередь должна быть удалена (и, возможно, подписка Redis отменена, если она была последним подключением, которое его прослушивало).

Asyncio не означает, что мы должны забыть об очередях :) Также полезно ознакомиться с объединением нескольких задач одновременно (чтение из веб-сокета, чтение из очереди сообщений, возможно чтение из некоторой очереди уведомлений ...). Использование очередей также может помочь вам более аккуратно обрабатывать повторные подключения клиентов (без потери каких-либо сообщений).

...