Как правильно обрабатывать соединение Redis в Tornado?(Async - Pub / Sub) - PullRequest
8 голосов
/ 11 декабря 2011

Я использую Redis вместе с моим приложением Tornado с клиентом asyc Brukva, когда я посмотрел примеры приложений на сайте Brukva, они устанавливают новое соединение по методу " init " в websocket

class MessagesCatcher(tornado.websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        super(MessagesCatcher, self).__init__(*args, **kwargs)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe('test_channel')

    def open(self):
        self.client.listen(self.on_message)

    def on_message(self, result):
        self.write_message(str(result.body))

    def close(self):
        self.client.unsubscribe('test_channel')
        self.client.disconnect()

хорошо в случае с websocket, но как с этим обращаться в обычном методе публикации Tornado RequestHandler, скажем, длинная операция опроса (модель публикации-подписки) . Я делаю новое клиентское соединение в каждом посте метод обработчика обновлений это правильный подход ?? Когда я проверил на консоли redis, я вижу, что количество клиентов увеличивается при каждой новой почтовой операции.

enter image description here

Вот пример моего кода.

c = brukva.Client(host = '127.0.0.1')
c.connect()

class MessageNewHandler(BaseHandler):
    @tornado.web.authenticated
    def post(self):

        self.listing_id = self.get_argument("listing_id")
        message = {
            "id": str(uuid.uuid4()),
            "from": str(self.get_secure_cookie("username")),
            "body": str(self.get_argument("body")),
        }
        message["html"] = self.render_string("message.html", message=message)

        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            c.publish(self.listing_id, message)
            logging.info("Writing message : " + json.dumps(message))
            self.write(json.dumps(message))

    class MessageUpdatesHandler(BaseHandler):
        @tornado.web.authenticated
        @tornado.web.asynchronous
        def post(self):
            self.listing_id = self.get_argument("listing_id", None)
            self.client = brukva.Client()
            self.client.connect()
            self.client.subscribe(self.listing_id)
            self.client.listen(self.on_new_messages)

        def on_new_messages(self, messages):
            # Closed client connection
            if self.request.connection.stream.closed():
                return
            logging.info("Getting update : " + json.dumps(messages.body))
            self.finish(json.dumps(messages.body))
            self.client.unsubscribe(self.listing_id)


        def on_connection_close(self):
            # unsubscribe user from channel
            self.client.unsubscribe(self.listing_id)
            self.client.disconnect()

Буду признателен, если вы предоставите пример кода для аналогичного случая.

Ответы [ 2 ]

11 голосов
/ 10 февраля 2013

Немного поздно, но я использую Торнадо-Редис . Работает с ioloop торнадо и модулем tornado.gen

Установка торнадоредиса

Может быть установлен из пипса

pip install tornadoredis

или с помощью setuptools

easy_install tornadoredis

но ты действительно не должен этого делать. Вы также можете клонировать хранилище и извлечь его. Затем запустите

python setup.py build
python setup.py install

Подключение к Redis

Следующий код входит в ваш main.py или эквивалентный

redis_conn = tornadoredis.Client('hostname', 'port')
redis_conn.connect()

redis.connect вызывается только один раз. Это блокирующий вызов, поэтому его следует вызывать перед запуском основного ioloop. Один и тот же объект соединения используется всеми обработчиками.

Вы можете добавить его в настройки своего приложения, например

settings = {
    redis = redis_conn
}
app = tornado.web.Application([('/.*', Handler),],
                              **settings)

Использовать торнадоредис

Соединение может использоваться в обработчиках как self.settings['redis'] или может быть добавлено как свойство класса BaseHandler. Ваш обработчик запросов подкласс этого класса и доступ к свойству.

class BaseHandler(tornado.web.RequestHandler):

    @property
    def redis():
        return self.settings['redis']

Для связи с Redis используются декораторы tornado.web.asynchronous и tornado.gen.engine

class SomeHandler(BaseHandler):

    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        foo = yield gen.Task(self.redis.get, 'foo')
        self.render('sometemplate.html', {'foo': foo}

Дополнительная информация

Больше примеров и других функций, таких как пул соединений и конвейеры, можно найти в репозитории github.

2 голосов
/ 29 февраля 2012

вы должны объединить соединения в вашем приложении. так как кажется, что brukva не поддерживает это автоматически (redis-py поддерживает это, но по своей природе блокирует, поэтому не подходит для торнадо), вам нужно написать свой собственный пул соединений.

модель довольно проста. что-то вроде этого (это не настоящий операционный код):

class BrukvaPool():

    __conns = {}


    def get(host, port,db):
        ''' Get a client for host, port, db '''

        key = "%s:%s:%s" % (host, port, db)

        conns = self.__conns.get(key, [])
        if conns:
            ret = conns.pop()
            return ret
        else:
           ## Init brukva client here and connect it

    def release(client):
        ''' release a client at the end of a request '''
        key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db)
        self.__conns.setdefault(key, []).append(client)

это может быть немного сложнее, но это главная идея.

...