Redis Pub / Sub добавление дополнительных каналов в середине подписки - PullRequest
2 голосов
/ 14 марта 2011

Можно ли добавить дополнительные подписки для подключения Redis? У меня есть поток прослушивания, но, похоже, новые команды SUBSCRIBE на него не влияют.

Если это ожидаемое поведение, какой шаблон следует использовать, если пользователи добавляют ленту биржевых сообщений в свои интересы или присоединяются к чату?

Я хотел бы реализовать класс Python, похожий на:

import threading
import redis

class RedisPubSub(object):
    def __init__(self):
        self._redis_pub = redis.Redis(host='localhost', port=6379, db=0)        
        self._redis_sub = redis.Redis(host='localhost', port=6379, db=0)        
        self._sub_thread = threading.Thread(target=self._listen)
        self._sub_thread.setDaemon(True)
        self._sub_thread.start()

    def publish(self, channel, message):
        self._redis_pub.publish(channel, message)

    def subscribe(self, channel):
        self._redis_sub.subscribe(channel)

    def _listen(self):
        for message in self._redis_sub.listen():
            print message

Ответы [ 2 ]

5 голосов
/ 15 марта 2011

Классы python-redis Redis и ConnectionPool наследуются от threading.local, и это производит "магические" эффекты, которые вы видите.

Сводка : клиенты self._redis_sub вашего основного потока и рабочих потоков заканчивают тем, что использовали два разных соединения с сервером, но только соединение основного потока вызвало команду SUBSCRIBE.

Подробности : Поскольку основной поток создает self._redis_sub, этот клиент в конечном итоге помещается в локальное хранилище потока. Далее я предполагаю, что основной поток выполняет client.subscribe(channel) вызов. Теперь клиент основного потока подписан на соединение 1. Затем вы запускаете рабочий поток self._sub_thread, который в конечном итоге имеет свой собственный атрибут self._redis_sub, установленный на новый экземпляр redis.Client, который создает новый пул соединений и устанавливает новое соединение. на сервер Redis.

Это новое соединение еще не подписано на ваш канал, поэтому listen() немедленно возвращается. Таким образом, с python-redis вы не можете передавать установленное соединение с выдающимися подписками (или любыми другими командами с отслеживанием состояния) между потоками.

В зависимости от того, как вы планируете реализовать свое приложение, вам может потребоваться переключиться на использование другого клиента или придумать какой-то другой способ сообщить состояние подписки рабочим потокам, например, отправлять команды подписки через очередь.

Еще одна проблема заключается в том, что python-redis использует блокирующие сокеты, которые не позволяют вашему прослушивающему потоку выполнять другую работу во время ожидания сообщений, и он не может сигнализировать о своем желании отписаться, если это не происходит сразу после получения сообщения.

1 голос
/ 15 марта 2013

Асинхронный способ:

Витая рамка и штекер txredisapi

Пример кода (Подписаться:

import txredisapi as redis

from twisted.application import internet
from twisted.application import service


class myProtocol(redis.SubscriberProtocol):
    def connectionMade(self):
        print "waiting for messages..."
        print "use the redis client to send messages:"
        print "$ redis-cli publish chat test"
        print "$ redis-cli publish foo.bar hello world"
        self.subscribe("chat")
        self.psubscribe("foo.*")


        reactor.callLater(10, self.unsubscribe, "chat")
        reactor.callLater(15, self.punsubscribe, "foo.*")

        # self.continueTrying = False
        # self.transport.loseConnection()

    def messageReceived(self, pattern, channel, message):
        print "pattern=%s, channel=%s message=%s" % (pattern, channel, message)

    def connectionLost(self, reason):
        print "lost connection:", reason


class myFactory(redis.SubscriberFactory):
    # SubscriberFactory is a wapper for the ReconnectingClientFactory
    maxDelay = 120
    continueTrying = True
    protocol = myProtocol


application = service.Application("subscriber")
srv = internet.TCPClient("127.0.0.1", 6379, myFactory())
srv.setServiceParent(application)

Только один поток, без головной боли:)

Зависит от того, какое приложение и кодирование, конечно.В сетевом корпусе идут витые.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...