Будет ли Redis Pub / Sub сохранять или сохранять прошлые данные, когда клиент отписался от темы? - PullRequest
0 голосов
/ 01 апреля 2019

У меня есть проект Django с открытой веб-розеткой Tornado, и я подписан на тему в моем Redis Pub / Sub. Я использую Asyncio и Aioredis. Перед обновлением страницы, закрытием браузера или переходом с этой страницы я призову закрыть веб-сокет, что откажется от этой темы.

Проблема в том, что иногда, когда я меняю страницы или обновляю страницу, куча сообщений из прошлого будет перекачана обратно во вновь открытую веб-розетку. Это происходит не каждый раз, и я не уверен, что еще я могу сделать, чтобы прошлые сообщения не возвращались при обновлении страницы. Я уже убедился, что веб-сокет закроется и отписаться от темы на странице обновления / выгрузки окна.

Сохраняет ли Redis Pub / Sub старые сообщения где-нибудь, пока клиент отписывается? А когда клиент подписывается обратно на ту же тему, старые сообщения рассылаются? Это нормальное поведение для Redis Pub / Sub? У меня сложилось впечатление, что Redis Pub / Sub не сохраняет сообщения, и если клиент отписался, сообщения просто удаляются для этого клиента.

Мне нужно убедиться, что при перезагрузке страницы старые сообщения не возвращаются обратно в веб-сокет.

Вот как я написал RedisChannel для выполнения функций pub / sub:

import aioredis
class RedisChannel(object):
    '''
    Redis backed pub-sub websocket channel.
    '''
    async def subscribe(self, **kwargs):
        '''
        Subscribe to topics
        '''
        topics = kwargs.get('topics')
        return await self.conn.subscribe(*topics)

    async def unsubscribe(self, **kwargs):
        '''
        Unsubscribe to topics
        '''
        topics = kwargs.get('topics')
        return await self.conn.unsubscribe(*topics)

    async def send(self, **kwargs):
        data = {}

        # If client socket is provided, only send to this socket.
        ws = kwargs.get('ws')

        # Topic for this message. Compulsory for broadcast.
        topic = kwargs.get('topic')

        # Usually JSON
        if kwargs.get('data'):
            data['data'] = kwargs.get('data')

        # I'm using 60 seconds right now just to try to limit the list of past messages
        # But the behaviour I need is 0 past messages on page reload in browser
        push_event = True
        if kwargs.get('timestamp'):
            event_timestamp = kwargs.get("timestamp", 0)
            data['timestamp'] = event_timestamp
            # logger.debug(data)
            current_time = timezone.now()
            if event_timestamp:
                event_dt = get_utc_time(datetime.utcfromtimestamp(event_timestamp))
                if event_dt:
                    time_apart = current_time - event_dt
                    duration = abs(time_apart.total_seconds())
                    logger.debug("Time apart between event and current time = {}".format(duration))
                    if duration >= 60:
                        push_event = False

        if not push_event:
            data = {}

        return await self.conn.publish_json(topic, json.dumps(data, separators=(',', ': ')))

    async def connect(self):
        redis_settings = settings['redis']['channel']

        self.conn = await aioredis.create_redis_pool(
                (
                    redis_settings.get('host'),
                    redis_settings.get('port')
                ),
                db=redis_settings.get('db'),
                minsize=2,
                maxsize=redis_settings.get('max_connections'),
                encoding='utf-8'
            )

Вот как я написал обработчик websocket для подписки / отмены подписки на тему Redis:

import asyncio, json

ws_channel = RedisChannel()
asyncio.get_event_loop().create_task(ws_channel.connect())

async def reader(ch, ws):
    while (await ch.wait_message()):
        data = await ch.get_json()

        if data:
            ws.write_message(data)
        await asyncio.sleep(0.001)
        # time.sleep

class ResultsWsHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        try:
            self.write_message(json.dumps('Websocket opened.'))
        except Exception as e:
            logger.error(str(e))

    def on_message(self, message):
        asyncio.ensure_future(self.on_message_async(message))

    async def on_message_async(self, message):
    # async def on_message(self, message):
        data = json.loads(message)

        action = data.get('action', None)

        topics = data.get('cameras', [])

        if topics or action is not None:
            try:
                action = int(action)

                if action == 0:  # 0 - heartbeat
                    logger.debug('Heartbeat.')

                    param = {'type': 0}
                    self.write_message(json.dumps(param))

                elif action == 1:  # 1 - subscribe
                    channels = await ws_channel.subscribe(topics=topics)

                    logger.debug(f'Successfully subscribed from {topics}.')
                    self.write_message(json.dumps(f'Successfully subscribed to {topics}.'))

                    task_list = []
                    for c in channels:
                        task_list.append(asyncio.ensure_future(reader(c, self)))

                    await asyncio.wait(task_list)

                elif action == 2:  # 2 - unsubscribe
                    await ws_channel.unsubscribe(topics=topics)

                    logger.debug(f'Successfully unsubscribe from {topics}.')
                    self.write_message(json.dumps(f'Successfully unsubscribe from {topics}.'))

                else:
                    logger.debug(f'Other: {data}')

            except Exception as e:
                logger.error(json.dumps(str(e), separators=(',', ': ')))
                self.write_message(json.dumps(str(e), separators=(',', ': ')))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...