У меня есть проект Django с открытой веб-розеткой Tornado, и я подписан на тему в моем Redis Pub / Sub. Я использую Asyncio и Aioredis. Перед обновлением страницы, закрытием браузера или переходом с этой страницы я призову закрыть веб-сокет, что откажется от этой темы.
Проблема в том, что иногда, когда я меняю страницы или обновляю страницу, куча сообщений из прошлого будет перекачана обратно во вновь открытую веб-розетку. Это происходит не каждый раз, и я не уверен, что еще я могу сделать, чтобы прошлые сообщения не возвращались при обновлении страницы. Я уже убедился, что веб-сокет закроется и отписаться от темы на странице обновления / выгрузки окна.
Сохраняет ли Redis Pub / Sub старые сообщения где-нибудь, пока клиент отписывается? А когда клиент подписывается обратно на ту же тему, старые сообщения рассылаются? Это нормальное поведение для Redis Pub / Sub? У меня сложилось впечатление, что Redis Pub / Sub не сохраняет сообщения, и если клиент отписался, сообщения просто удаляются для этого клиента.
Мне нужно убедиться, что при перезагрузке страницы старые сообщения не возвращаются обратно в веб-сокет.
Вот как я написал RedisChannel для выполнения функций pub / sub:
import aioredis
class RedisChannel(object):
'''
Redis backed pub-sub websocket channel.
'''
async def subscribe(self, **kwargs):
'''
Subscribe to topics
'''
topics = kwargs.get('topics')
return await self.conn.subscribe(*topics)
async def unsubscribe(self, **kwargs):
'''
Unsubscribe to topics
'''
topics = kwargs.get('topics')
return await self.conn.unsubscribe(*topics)
async def send(self, **kwargs):
data = {}
# If client socket is provided, only send to this socket.
ws = kwargs.get('ws')
# Topic for this message. Compulsory for broadcast.
topic = kwargs.get('topic')
# Usually JSON
if kwargs.get('data'):
data['data'] = kwargs.get('data')
# I'm using 60 seconds right now just to try to limit the list of past messages
# But the behaviour I need is 0 past messages on page reload in browser
push_event = True
if kwargs.get('timestamp'):
event_timestamp = kwargs.get("timestamp", 0)
data['timestamp'] = event_timestamp
# logger.debug(data)
current_time = timezone.now()
if event_timestamp:
event_dt = get_utc_time(datetime.utcfromtimestamp(event_timestamp))
if event_dt:
time_apart = current_time - event_dt
duration = abs(time_apart.total_seconds())
logger.debug("Time apart between event and current time = {}".format(duration))
if duration >= 60:
push_event = False
if not push_event:
data = {}
return await self.conn.publish_json(topic, json.dumps(data, separators=(',', ': ')))
async def connect(self):
redis_settings = settings['redis']['channel']
self.conn = await aioredis.create_redis_pool(
(
redis_settings.get('host'),
redis_settings.get('port')
),
db=redis_settings.get('db'),
minsize=2,
maxsize=redis_settings.get('max_connections'),
encoding='utf-8'
)
Вот как я написал обработчик websocket для подписки / отмены подписки на тему Redis:
import asyncio, json
ws_channel = RedisChannel()
asyncio.get_event_loop().create_task(ws_channel.connect())
async def reader(ch, ws):
while (await ch.wait_message()):
data = await ch.get_json()
if data:
ws.write_message(data)
await asyncio.sleep(0.001)
# time.sleep
class ResultsWsHandler(tornado.websocket.WebSocketHandler):
def open(self):
try:
self.write_message(json.dumps('Websocket opened.'))
except Exception as e:
logger.error(str(e))
def on_message(self, message):
asyncio.ensure_future(self.on_message_async(message))
async def on_message_async(self, message):
# async def on_message(self, message):
data = json.loads(message)
action = data.get('action', None)
topics = data.get('cameras', [])
if topics or action is not None:
try:
action = int(action)
if action == 0: # 0 - heartbeat
logger.debug('Heartbeat.')
param = {'type': 0}
self.write_message(json.dumps(param))
elif action == 1: # 1 - subscribe
channels = await ws_channel.subscribe(topics=topics)
logger.debug(f'Successfully subscribed from {topics}.')
self.write_message(json.dumps(f'Successfully subscribed to {topics}.'))
task_list = []
for c in channels:
task_list.append(asyncio.ensure_future(reader(c, self)))
await asyncio.wait(task_list)
elif action == 2: # 2 - unsubscribe
await ws_channel.unsubscribe(topics=topics)
logger.debug(f'Successfully unsubscribe from {topics}.')
self.write_message(json.dumps(f'Successfully unsubscribe from {topics}.'))
else:
logger.debug(f'Other: {data}')
except Exception as e:
logger.error(json.dumps(str(e), separators=(',', ': ')))
self.write_message(json.dumps(str(e), separators=(',', ': ')))