Есть ли альтернатива переменным threading.Condition в python, которые лучше поддерживают таймауты без опроса? - PullRequest
3 голосов
/ 09 февраля 2010

Я использую условные переменные в потоках, которые требуют времени ожидания. До тех пор, пока я не увидел использование ЦП при наличии большого количества потоков, я заметил, что переменная условия, предоставляемая в поточном модуле, на самом деле не спит, а опрашивает, когда в качестве аргумента указано время ожидания.

Есть ли альтернатива этому, которая на самом деле спит как птхедс?

Кажется болезненным иметь много потоков, спящих с интервалом в несколько секунд, только чтобы он все еще потреблял процессорное время.

Спасибо!

Ответы [ 2 ]

3 голосов
/ 09 февраля 2010

Это кажется сложным в Python, но здесь есть одно решение. Он основан на порождении дополнительных потоков, но не использует опрос И гарантирует, что исходный поток проснется, как только истечет время ожидания или как только вернется исходный wait ().

Примечание. Следующий код включает в себя контрольный пример, который проверяет окончание условного ожидания как из-за тайм-аута, так и из-за уведомления.

from thread import start_new_thread
from threading import Condition, Timer

class ConditionWithoutPolling():
    """Implements wait() with a timeout without polling.  Wraps the Condition
    class."""
    def __init__(self, condition):
        self.condition = condition
        self.wait_timeout_condition = Condition()

    def wait(self, timeout=None):
        """Same as Condition.wait() but it does not use a poll-and-sleep method
        to implement timeouts.  Instead, if a timeout is requested two new
        threads are spawned to implement a non-pol-and-wait method."""
        if timeout is None:
            # just use the original implementation if no waiting is involved
            self.condition.wait()
            return
        else:
            # this new boolean will tell us whether we are done waiting or not
            done = [False]

            # wait on the original condition in a new thread
            start_new_thread(self.wait_on_original, (done,))

            # wait for a timeout (without polling) in a new thread
            Timer(timeout, lambda : self.wait_timed_out(done)).start()

            # wait for EITHER of the previous threads to stop waiting
            with self.wait_timeout_condition:
                while not done[0]:
                    self.wait_timeout_condition.wait()

    def wait_on_original(self, done):
        """Waits on the original Condition and signals wait_is_over when done."""
        self.condition.wait()
        self.wait_is_over(done)

    def wait_timed_out(self, done):
        """Called when the timeout time is reached."""
        # we must re-acquire the lock we were waiting on before we can return
        self.condition.acquire()
        self.wait_is_over(done)

    def wait_is_over(self, done):
        """Modifies done to indicate that the wait is over."""
        done[0] = True
        with self.wait_timeout_condition:
            self.wait_timeout_condition.notify()

    # wrap Condition methods since it wouldn't let us subclass it ...
    def acquire(self, *args):
        self.condition.acquire(*args)
    def release(self):
        self.condition.release()
    def notify(self):
        self.condition.notify()
    def notify_all(self):
        self.condition.notify_all()
    def notifyAll(self):
        self.condition.notifyAll()

def test(wait_timeout, wait_sec_before_notification):
    import time
    from threading import Lock
    lock = Lock()
    cwp = ConditionWithoutPolling(Condition(lock))
    start = time.time()

    def t1():
        with lock:
            print 't1 has the lock, will wait up to %f sec' % (wait_timeout,)
            cwp.wait(wait_timeout)
        time_elapsed = time.time() - start
        print 't1: alive after %f sec' % (time_elapsed,)        

    # this thread will acquire the lock and then conditionally wait for up to 
    # timeout seconds and then print a message 
    start_new_thread(t1, ())

    # wait until it is time to send the notification and then send it
    print 'main thread sleeping (will notify in %f sec)' % (wait_sec_before_notification,)
    time.sleep(wait_sec_before_notification)
    with lock:
        cwp.notifyAll()
        print 'notification sent, will continue in 2sec'
    time.sleep(2.0) # give the other time thread to finish before exiting

if __name__ == "__main__":
    print 'test wait() ending before the timeout ...'
    test(2.0, 1.0)

    print '\ntest wait() ending due to the timeout ...'
    test(2.0, 4.0)
1 голос
/ 09 февраля 2010

Я не знаком с Python, но если вы можете заблокировать переменную условия (без тайм-аута), вы можете реализовать тайм-аут самостоятельно. Пусть блокирующий поток запоминает время начала блокировки и устанавливает таймер для его сигнализации. Когда он проснется, проверьте, сколько времени прошло. Это не очень хороший способ сделать это, если вы не можете объединить таймеры в один поток, иначе количество потоков удвоится без причины.

...