Как добавить тайм-аут в Deferred из API-интерфейса Twisted deferToThread? - PullRequest
6 голосов
/ 18 апреля 2011
from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import defer
import time

def worker(arg):
    print 'Hello world'
     time.sleep(10)
    return 1

def run():
    print 'Starting workers'
    l = []
    for x in range(2):
        l.append(threads.deferToThread(worker, x))
    return defer.DeferredList(l)

def res(results):
    print results
    reactor.stop()

d = run()
d.addCallback(res)
reactor.run()

Как остановить работников по таймауту?

Ответы [ 4 ]

5 голосов
/ 18 апреля 2011

Потоки не могут быть прерваны, если они не сотрудничают с вами. time.sleep(10) не собирается сотрудничать, поэтому я не думаю, что вы можете прервать этого работника. Если у вас есть работник другого типа, который имеет несколько отдельных фаз или работает по циклу над некоторыми задачами, то вы можете сделать что-то вроде этого:

def worker(stop, jobs):
    for j in jobs:
        if stop:
            break
        j.do()

stop = []
d = deferToThread(worker)

# This will make the list eval to true and break out of the loop.
stop.append(None)

Это не специфично для Twisted. Так работают потоки в Python.

3 голосов
/ 26 сентября 2013

Хотя прерывание потоков может быть невозможным, Deferred можно остановить с помощью функции cancel, которая, я думаю, доступна в Twisted 10.1.0 и более поздних версиях.

Я использовалследующий класс, чтобы сделать Deferreds, который вызывает конкретную функцию, если Deferred не сработал через некоторое время.Это может быть полезно для кого-то, у кого есть тот же вопрос, который был задан в теме ОП.

РЕДАКТИРОВАТЬ: Как предлагается в комментариях ниже, лучше не наследовать от defer.Deferred.Поэтому я изменил код для использования оболочки, которая достигает того же эффекта.

class DeferredWrapperWithTimeout(object):
    '''
    Holds a deferred that allows a specified function to be called-back
    if the deferred does not fire before some specified timeout.
    '''
    def __init__(self, canceller=None):
        self._def = defer.Deferred(canceller)

    def _finish(self, r, t):
        '''
        Function to be called (internally) after the Deferred
        has fired, in order to cancel the timeout.
        '''
        if ( (t!=None) and (t.active()) ):
            t.cancel()
        return r

    def getDeferred(self):
        return self._def

    def addTimeoutCallback(self, reactr, timeout,
                           callUponTimeout, *args, **kw):
        '''
        The function 'callUponTimeout' (with optional args or keywords)
        will be called after 'timeout' seconds, unless the Deferred fires.
        '''

        def timeoutCallback():
            self._def.cancel()
            callUponTimeout(*args, **kw)
        toc = reactr.callLater(timeout, timeoutCallback)
        return self._def.addCallback(self._finish, toc)

Пример обратный вызов до истечения времени ожидания:

from twisted.internet import reactor

from DeferredWithTimeout import *

dw = DeferredWrapperWithTimeout()
d  = dw.getDeferred()

def testCallback(x=None):
    print "called"

def testTimeout(x=None):
    print "timedout"

d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.callLater(2, d.callback, "cb")
reactor.run()

Печать"и ничего больше.

Пример Тайм-аут до обратного вызова:

from twisted.internet import reactor

from DeferredWithTimeout import *

dw = DeferredWrapperWithTimeout()
d  = dw.getDeferred()

def testCallback(x=None):
    print "called"

def testTimeout(x=None):
    print "timedout"

d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.run()

Печатает" Тайм-аут "через 20 секунд, и ничего больше.

0 голосов
/ 15 сентября 2015

Мы делаем это, используя декоратор. Преимущество этого метода в том, что отсроченный отменяется при достижении времени ожидания. Это должно как-то стать частью библиотеки Twisted imho

from twisted.internet import defer, reactor

def timeout(secs):
    """Decorator to add timeout to Deferred calls"""
    def wrap(func):
        @defer.inlineCallbacks
        def _timeout(*args, **kwargs):
            raw_d = func(*args, **kwargs)
            if not isinstance(raw_d, defer.Deferred):
                defer.returnValue(raw_d)

            timeout_d = defer.Deferred()
            times_up = reactor.callLater(secs, timeout_d.callback, None)

            try:
                raw_result, timeout_result = yield defer.DeferredList(
                    [raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True,
                    consumeErrors=True)
            except defer.FirstError as e:  # Only raw_d should raise an exception
                assert e.index == 0
                times_up.cancel()
                e.subFailure.raiseException()
            else:  # timeout
                if timeout_d.called:
                    raw_d.cancel()
                    raise Exception("%s secs have expired" % secs)

            # no timeout
            times_up.cancel()
            defer.returnValue(raw_result)
        return _timeout
return wrap
0 голосов
/ 22 марта 2014

Ну, мой ответ не о потоках, но, как было сказано, вы можете реализовать функцию тайм-аута как отдельный помощник:

from twisted.internet import defer

def add_watchdog(deferred, timeout=0.05):

    def callback(value):
        if not watchdog.called:
            watchdog.cancel()
        return value

    deferred.addBoth(callback)

    from twisted.internet import reactor
    watchdog = reactor.callLater(timeout, defer.timeout, deferred)

d = defer.Deferred()
add_watchdog(d)

Тогда вы можете поймать defer.TimeoutError в отложенный ответ, если вам нужно.

...