Twisted: почему передача отложенного обратного вызова в отложенный поток внезапно блокирует поток? - PullRequest
7 голосов
/ 18 марта 2010

Я безуспешно пытался использовать txredis (неблокирующий витой API для redis) для постоянной очереди сообщений, которую я пытаюсь настроить с помощью проекта scrapy, над которым я работаю. Я обнаружил, что, хотя клиент не блокировал, он стал намного медленнее, чем мог бы быть, потому что то, что должно было быть одним событием в контуре реактора, было разделено на тысячи шагов.

Поэтому вместо этого я попытался использовать redis-py (обычный блокирующий витой API) и обернуть вызов в отложенный поток. Это прекрасно работает, однако я хочу выполнить внутреннюю отсрочку, когда я выполняю вызов redis, поскольку я хотел бы настроить пул соединений в попытках ускорить процесс.

Ниже представлена ​​моя интерпретация некоторого примера кода, взятого из витых документов для отложенного потока, чтобы проиллюстрировать мой вариант использования:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

А вот мое изменение для пула соединений, которое делает код в отложенном потоке блокировки:

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Итак, мой вопрос: кто-нибудь знает, почему мое изменение вызывает блокировку отложенного потока и / или кто-нибудь может предложить лучшее решение?

Ответы [ 3 ]

12 голосов
/ 18 марта 2010

Ну, а витые документы говорят:

Отложенные не делают код магически не блок

Всякий раз, когда вы используете блокирующий код, такой как sleep, вы должны перенести его на новый поток.

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

В случае, если API Redis не очень сложен, было бы более естественно переписать его, используя twisted.web, вместо того, чтобы просто вызывать API блокировки во многих потоках.

1 голос
/ 10 сентября 2010

Также имеется обновленный клиент Redis для Twisted, который уже поддерживает новый протокол и функции Redis 2.x. Вы должны определенно попробовать. Это называется txredisapi.

Для постоянной очереди сообщений я бы порекомендовал RestMQ. Система очереди сообщений на основе Redis, построенная на основе циклонов и txredisapi.

http://github.com/gleicon/restmq

Приветствия

0 голосов
/ 18 марта 2010

В связанной заметке вы, вероятно, могли бы получить много, используя Redis-клиент, созданный специально для Twisted, такой как этот: http://github.com/deldotdr/txRedis

...