Что можно сделать, когда издатель намного быстрее подписчика? - PullRequest
0 голосов
/ 06 марта 2020

У меня есть модель издатель-подписчик, где пользователь подключается через веб-сокет и подписывается на канал. Издатель публикует сообщения со скоростью 3000 мсг / с, а другой абонент может обрабатывать 2000 мсг / с, поэтому из-за этой задержки на стороне подписчика остальные сообщения отправляются в буфер повторного выполнения. Таким образом, даже если я продолжу работу издателя в течение 10 минут, а затем остановлю его, подписчик все равно будет обрабатывать сообщения, потому что сообщения находятся в буфере и из-за этого будет задержка, и отправка сообщений пользователю получит с задержкой.

Пример кода издателя -

import redis
import asyncio
import random
import time

def connectRedis():
    redisConn = redis.StrictRedis(host="127.0.0.1", port=6379)
    return redisConn

def publish():
    pubSub = connectRedis()
    channel = "FY-CH"
    clientIdStr = "FA"
    count = 1
    while True:
        numStr = random.randint(0,99)

        clientId = clientIdStr+"%s"%str(numStr)
        orderstr = 'orderUpdate|0|700|2073|%s|0|3572|W|1264268746|C|3668|%s|%s|1000000001283150|M|B|2|0|0|0|1|1|0|0|0|1.109500|0.000000|320012304041.000000|1|-1|0.000000|A|0.000000|0|B|A|C||||1|CONFIRMED||1||N|0.000000|B||N|N|AFKPY8N'%(clientId, clientId, clientId)
        pubSub.publish(channel, orderstr)
        print("Published for {}".format(clientId))

if __name__ == '__main__':
    publish()

Пример кода подписчика -

class RedisSub():

    subscriberDict = None
    iwsSubsDict = None ## It will contain the connection objects of the user connected
    count = 0

    def __init__(self, arg):
        RedisSub.count      += 1
        self.channelList    = []
        self.redisCli       = tornadis.PubSubClient(host=127.0.0.1)
        if len(str(chanlName)) > 0:
            self.chanlName  = chanlName
        else:
            self.chanlName  = "Channel "+str(RedisSub.count)
        self.startRecv()

    @gen.coroutine
    def startRecv(self):
        while True:
            try:
                subMess = yield self.redisCli.pubsub_pop_message(SUBSCRIPTION_WAIT_TIME)
                logging.info("subMess:{}".format(subMess))
                if not subMess: ## When timeout happens this will return None
                    continue
                if not isinstance(subMess, list):
                    logging.error(subMess)
                    yield gen.Task(IOLoop.instance().add_timeout, time.time() + 2)## Avoide infinite loop
                    self.reSub()
                    continue


                if RedisSub.iwsSubsDict:
                    dataRxList = tornado.escape.to_unicode(subMess[2])
                    dataRxList = dataRxList.split("|")
                    logging.info(dataRxList)
                    if int(dataRxList[2]) <= 0: ## If message length is 0 then it is an invalid data
                        continue
                    userId = dataRxList[12]
                    if userId in RedisSub.iwsSubsDict:
                        logging.info(RedisSub.iwsSubsDict)
                        connDict = {}
                        for appId in RedisSub.iwsSubsDict[userId]:
                            connDict.update(RedisSub.iwsSubsDict[userId][appId])
                        for conn in connDict:
                            data = "|".join(map(str,dataRxList))
                            conn.write_message(data)

Я использую торнадо и торнадис и запускаю абонента перед началом события l oop сам. Если кто-то может помочь мне относительно того, что еще можно сделать в этом, или я делаю что-то не так в коде?

...