У меня есть модель издатель-подписчик, где пользователь подключается через веб-сокет и подписывается на канал. Издатель публикует сообщения со скоростью 3000 мсг / с, а другой абонент может обрабатывать 2000 мсг / с, поэтому из-за этой задержки на стороне подписчика остальные сообщения отправляются в буфер повторного выполнения. Таким образом, даже если я продолжу работу издателя в течение 10 минут, а затем остановлю его, подписчик все равно будет обрабатывать сообщения, потому что сообщения находятся в буфере и из-за этого будет задержка, и отправка сообщений пользователю получит с задержкой.
Пример кода издателя -
import redis
import asyncio
import random
import time
def connectRedis():
redisConn = redis.StrictRedis(host="127.0.0.1", port=6379)
return redisConn
def publish():
pubSub = connectRedis()
channel = "FY-CH"
clientIdStr = "FA"
count = 1
while True:
numStr = random.randint(0,99)
clientId = clientIdStr+"%s"%str(numStr)
orderstr = 'orderUpdate|0|700|2073|%s|0|3572|W|1264268746|C|3668|%s|%s|1000000001283150|M|B|2|0|0|0|1|1|0|0|0|1.109500|0.000000|320012304041.000000|1|-1|0.000000|A|0.000000|0|B|A|C||||1|CONFIRMED||1||N|0.000000|B||N|N|AFKPY8N'%(clientId, clientId, clientId)
pubSub.publish(channel, orderstr)
print("Published for {}".format(clientId))
if __name__ == '__main__':
publish()
Пример кода подписчика -
class RedisSub():
subscriberDict = None
iwsSubsDict = None ## It will contain the connection objects of the user connected
count = 0
def __init__(self, arg):
RedisSub.count += 1
self.channelList = []
self.redisCli = tornadis.PubSubClient(host=127.0.0.1)
if len(str(chanlName)) > 0:
self.chanlName = chanlName
else:
self.chanlName = "Channel "+str(RedisSub.count)
self.startRecv()
@gen.coroutine
def startRecv(self):
while True:
try:
subMess = yield self.redisCli.pubsub_pop_message(SUBSCRIPTION_WAIT_TIME)
logging.info("subMess:{}".format(subMess))
if not subMess: ## When timeout happens this will return None
continue
if not isinstance(subMess, list):
logging.error(subMess)
yield gen.Task(IOLoop.instance().add_timeout, time.time() + 2)## Avoide infinite loop
self.reSub()
continue
if RedisSub.iwsSubsDict:
dataRxList = tornado.escape.to_unicode(subMess[2])
dataRxList = dataRxList.split("|")
logging.info(dataRxList)
if int(dataRxList[2]) <= 0: ## If message length is 0 then it is an invalid data
continue
userId = dataRxList[12]
if userId in RedisSub.iwsSubsDict:
logging.info(RedisSub.iwsSubsDict)
connDict = {}
for appId in RedisSub.iwsSubsDict[userId]:
connDict.update(RedisSub.iwsSubsDict[userId][appId])
for conn in connDict:
data = "|".join(map(str,dataRxList))
conn.write_message(data)
Я использую торнадо и торнадис и запускаю абонента перед началом события l oop сам. Если кто-то может помочь мне относительно того, что еще можно сделать в этом, или я делаю что-то не так в коде?