Возможен ли неблокирующий паб Redis? - PullRequest
25 голосов
/ 24 октября 2011

Я хочу использовать redis 'pubsub для передачи некоторых сообщений, но не хочу, чтобы его блокировали с помощью listen, как показано в коде ниже:

import redis
rc = redis.Redis()

ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])

rc.publish('foo', 'hello world')

for item in ps.listen():
    if item['type'] == 'message':
        print item['channel']
        print item['data']

Последняя секция for заблокируется.Я просто хочу проверить, есть ли данные на данном канале, как я могу это сделать?Есть ли check как метод?

Ответы [ 10 ]

43 голосов
/ 08 января 2013

Если вы думаете о неблокирующей асинхронной обработке, вы, вероятно, используете (или должны использовать) асинхронный фреймворк / сервер.

  • если вы используете Торнадо , существует Торнадо-Редис . Это использует родные вызовы генератора Торнадо. В демоверсии Websocket приведен пример ее использования в сочетании с pub / sub.

  • если вы используете Twisted , существует txRedis . Там у вас также есть pub / sub example .

  • также кажется, что вы можете использовать Redis-py в сочетании с Gevent без проблем, используя Gevent's patching (gevent.monkey.patch_all()).

UPDATE: Прошло 5 лет с момента первоначального ответа, в то время как Python получил поддержку встроенного асинхронного ввода-вывода . Теперь есть AIORedis, асинхронный клиент IO Redis .

14 голосов
/ 24 февраля 2016

Принятый ответ устарел, так как redis-py рекомендует использовать неблокирующую get_message(). Но это также дает возможность легко использовать потоки.

https://pypi.python.org/pypi/redis

Существует три различных способа чтения сообщений.

За кулисами get_message () использует системный модуль «select» для быстрого опроса сокета соединения. Если есть данные, доступные для чтения, get_message () прочитает их, отформатирует сообщение и вернет его или передаст обработчику сообщений. Если нет данных для чтения, get_message () немедленно вернет None. Это упрощает интеграцию в существующий цикл событий внутри вашего приложения.

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)

Более старые версии redis-py читают только сообщения с pubsub.listen (). listen () - это генератор, который блокирует доступ к сообщению. Если вашему приложению не нужно ничего делать, кроме как получать и обрабатывать сообщения, полученные от redis, listen () - это простой способ начать работу.

 for message in p.listen():
     # do something with the message

Третий вариант запускает цикл обработки событий в отдельном потоке. pubsub.run_in_thread () создает новый поток и запускает цикл обработки событий. Объект потока возвращается вызывающей стороне run_in_thread (). Вызывающая сторона может использовать метод thread.stop (), чтобы завершить цикл обработки событий и поток. За кулисами это просто оболочка для get_message (), которая запускается в отдельном потоке, по сути, создавая для вас крошечный неблокирующий цикл событий. run_in_thread () принимает необязательный аргумент sleep_time. Если указано, цикл обработки событий будет вызывать time.sleep () со значением в каждой итерации цикла.

Примечание. Поскольку мы работаем в отдельном потоке, невозможно обработать сообщения, которые не обрабатываются автоматически с помощью зарегистрированных обработчиков сообщений. Поэтому redis-py не позволяет вам вызывать run_in_thread (), если вы подписаны на шаблоны или каналы, к которым не подключены обработчики сообщений.

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()

Итак, чтобы ответить на ваш вопрос, просто проверьте get_message, когда вы хотите знать, пришло ли сообщение.

13 голосов
/ 26 июня 2014

Новая версия Redis-Py имеет поддержку асинхронного pubsub, проверьте https://github.com/andymccurdy/redis-py для более подробной информацииВот пример из самой документации:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)
7 голосов
/ 10 января 2012

Это рабочий пример для потоковой работы блокирующего слушателя.

import sys
import cmd
import redis
import threading


def monitor():
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)

    channel = sys.argv[1]
    p = r.pubsub()
    p.subscribe(channel)

    print 'monitoring channel', channel
    for m in p.listen():
        print m['data']


class my_cmd(cmd.Cmd):
    """Simple command processor example."""

    def do_start(self, line):
        my_thread.start()

    def do_EOF(self, line):
        return True


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print "missing argument! please provide the channel name."
    else:
        my_thread = threading.Thread(target=monitor)
        my_thread.setDaemon(True)

        my_cmd().cmdloop()
7 голосов
/ 24 октября 2011

Я не думаю, что это было бы возможно.Канал не имеет «текущих данных», вы подписываетесь на канал и начинаете получать сообщения, которые другие клиенты отправляют на канал, следовательно, это блокирующий API.Также, если вы посмотрите на документацию Redis Commands для pub / sub, это сделает ее более понятной.

2 голосов
/ 06 декабря 2014

Вот неблокирующее решение без потоков:

fd = ps.connection._sock.fileno();
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block
if rlist:
    for rfd in rlist:
        if fd == rfd:
            message = ps.get_message()

ps.get_message() достаточно само по себе, но я использую этот метод, так что я могу ждать несколько fds вместо одного только подключения redis.

1 голос
/ 24 января 2013

Наиболее эффективный подход будет основан на гринлетах, а не на потоках. Как основанная на гринлете среда параллелизма, Gevent уже достаточно хорошо зарекомендовал себя в мире Python. Поэтому интеграция Gevent с Redis-Py была бы замечательной. Это именно то, что обсуждается в этом выпуске на github:

https://github.com/andymccurdy/redis-py/issues/310

1 голос
/ 24 октября 2011

Чтобы достичь не блокирующего кода, вы должны выполнить другой тип кода парадигмы. Это не сложно, используя новый поток для прослушивания всех изменений и оставляя основной поток для других дел.

Также вам понадобится какой-то механизм для обмена данными между основным потоком и потоком подписчика redis.

0 голосов
/ 24 марта 2014

Redis 'pub / sub отправляет сообщения клиентам, подписанным (прослушивающим) на канале. Если вы не слушаете, вы пропустите сообщение (отсюда и блокирующий вызов). Если вы хотите, чтобы он был неблокирующим, я рекомендую использовать очередь (redis тоже неплохо справляется с этим). Если вам нужно использовать pub / sub, вы можете использовать как предложенный gevent, чтобы иметь асинхронный блокирующий прослушиватель, отправлять сообщения в очередь и использовать отдельного потребителя для обработки сообщений из этой очереди неблокирующим способом.

0 голосов
/ 28 ноября 2012

Вы можете использовать gevent, gevent patching для создания неблокирующего приложения redis pubsub.

...