Как реализовать блокировку с таймаутом в Python 2.7 - PullRequest
22 голосов
/ 06 декабря 2011

Есть ли способ реализовать блокировку в Python для целей многопоточности, чей метод acquire может иметь произвольное время ожидания?Единственные рабочие решения, которые я нашел до сих пор, используют опрос, который

  • я считаю неэффективным и неэффективным
  • Не сохраняет ограниченную гарантию ожидания / прогресса блокировки как решение проблемыпроблема критической секции

Есть ли лучший способ реализовать это?

Ответы [ 6 ]

21 голосов
/ 06 декабря 2011

, чтобы уточнить комментарий Стивена:

import threading
import time

lock = threading.Lock()
cond = threading.Condition(threading.Lock())

def waitLock(timeout):
    with cond:
        current_time = start_time = time.time()
        while current_time < start_time + timeout:
            if lock.acquire(False):
                return True
            else:
                cond.wait(timeout - current_time + start_time)
                current_time = time.time()
    return False

На что обратить внимание:

  • есть два threading.Lock() объекта, один из которых является внутренним для threading.Condition().
  • при манипулировании cond блокировка получена; однако операция wait() разблокирует ее, поэтому ее может просматривать любое количество потоков.
  • ожидание встроено в цикл for, который отслеживает время. threading.Condition может получать уведомления по причинам, отличным от тайм-аутов, поэтому вам все равно нужно отслеживать время, если вы действительно хотите, чтобы оно истекло.
  • даже при условии, что вы все еще «опрашиваете» реальную блокировку, потому что возможно пробуждение более чем одного потока и борьба за блокировку. если блокировка lock.acquire завершается неудачей, цикл возвращается в режим ожидания.
  • абоненты этой функции waitLock должны следовать lock.release() с cond.notify(), чтобы другие потоки, ожидающие ее, были уведомлены о том, что они должны повторить попытку получения блокировки. Это не показано в примере.
5 голосов
/ 04 января 2014

Моя версия с использованием потоковобезопасных очередей http://docs.python.org/2/library/queue.html и их методов put / get, которые поддерживают тайм-аут.

До сих пор работает нормально, но если кто-то может сделать рецензирование на нем, я будубудь благодарен.

"""
Thread-safe lock mechanism with timeout support module.
"""

from threading import ThreadError, current_thread
from Queue import Queue, Full, Empty


class TimeoutLock(object):
    """
    Thread-safe lock mechanism with timeout support.
    """

    def __init__(self, mutex=True):
        """
        Constructor.
        Mutex parameter specifies if the lock should behave like a Mutex, and
        thus use the concept of thread ownership.
        """
        self._queue = Queue(maxsize=1)
        self._owner = None
        self._mutex = mutex

    def acquire(self, timeout=0):
        """
        Acquire the lock.
        Returns True if the lock was succesfully acquired, False otherwise.

        Timeout:
        - < 0 : Wait forever.
        -   0 : No wait.
        - > 0 : Wait x seconds.
        """
        th = current_thread()
        try:
            self._queue.put(
                th, block=(timeout != 0),
                timeout=(None if timeout < 0 else timeout)
            )
        except Full:
            return False

        self._owner = th
        return True

    def release(self):
        """
        Release the lock.
        If the lock is configured as a Mutex, only the owner thread can release
        the lock. If another thread attempts to release the lock a
        ThreadException is raised.
        """
        th = current_thread()
        if self._mutex and th != self._owner:
            raise ThreadError('This lock isn\'t owned by this thread.')

        self._owner = None
        try:
            self._queue.get(False)
            return True
        except Empty:
            raise ThreadError('This lock was released already.')
2 голосов
/ 19 февраля 2019

Если кому-то нужен Python> = 3,2 API:

import threading
import time


class Lock(object):
    _lock_class = threading.Lock

    def __init__(self):
        self._lock = self._lock_class()
        self._cond = threading.Condition(threading.Lock())

    def acquire(self, blocking=True, timeout=-1):
        if not blocking or timeout == 0:
            return self._lock.acquire(False)
        cond = self._cond
        lock = self._lock
        if timeout < 0:
            with cond:
                while True:
                    if lock.acquire(False):
                        return True
                    else:
                        cond.wait()
        else:
            with cond:
                current_time = time.time()
                stop_time = current_time + timeout
                while current_time < stop_time:
                    if lock.acquire(False):
                        return True
                    else:
                        cond.wait(stop_time - current_time)
                        current_time = time.time()
                return False

    def release(self):
        with self._cond:
            self._lock.release()
            self._cond.notify()

    __enter__ = acquire

    def __exit__(self, t, v, tb):
        self.release()


class RLock(Lock):
    _lock_class = threading.RLock
1 голос
/ 06 декабря 2011

Я сомневаюсь, что это можно сделать.

Если вы хотите реализовать это без какого-либо опроса, то вам нужно, чтобы ОС знала, что поток заблокирован, и ОС должна знать время ожидания, чтобы через некоторое время разблокировать поток. Для этого поддержка должна уже существовать в ОС; Вы не можете реализовать это на уровне Python.

(Вы можете заблокировать поток на уровне ОС или на уровне приложения и иметь механизм, с помощью которого он может быть вызван другим потоком в соответствующее время, но тогда вам нужен этот другой поток для эффективного опроса )

В общем, у вас все равно нет действительно ограниченной гарантии ожидания / прогресса блокировки, поскольку вашему потоку придется ждать неограниченное время, пока не произойдет переключение контекста, чтобы он заметил, что он разблокирован. Таким образом, если вы не сможете установить верхнюю границу количества конфликтов процессора, вы не сможете использовать тайм-аут, чтобы уложиться в какие-либо жесткие сроки в реальном времени. Но вам, вероятно, это не нужно, иначе вы не мечтали бы использовать блокировки, реализованные в Python.


Из-за Python GIL (Global Interpreter Lock) эти основанные на опросе решения, вероятно, не столь неэффективны или не столь безграничны, как вы думаете (в зависимости от того, как они реализованы) (и при условии, что вы используете либо CPython или PyPy).

В каждый момент времени работает только один поток, и по определению есть еще один поток, который вы хотите запустить (тот, который содержит блокировку, которую вы ожидаете). GIL некоторое время удерживается одним потоком для выполнения группы байт-кодов, затем отбрасывается и повторно запрашивается, чтобы дать кому-то еще шанс. Таким образом, если поток с блокированным тайм-аутом находится в цикле, проверяющем время и уступающем другим потокам, он будет просыпаться только изредка, когда получит GIL, а затем почти сразу же отбросит его обратно кому-то еще и заблокирует Джил снова. Поскольку этот поток может когда-либо проснуться только тогда, когда он получит поворот в GIL, он также выполнит эту проверку, как только истечет время ожидания, так как он сможет возобновить выполнение, даже если время ожидания было волшебно совершенным.

Единственный раз, когда это приведет к значительной неэффективности, это если ваш поток заблокирован в ожидании потока, удерживающего блокировку, который заблокирован в ожидании чего-то, что не может быть вызвано другим потоком Python (скажем, заблокированным в IO) и нет никаких других запущенных потоков Python. Тогда ваш тайм-аут опроса действительно будет просто сидеть и проверять время, что может быть плохо, если вы ожидаете, что эта ситуация произойдет в течение длительного периода времени.

0 голосов
/ 04 октября 2018

Хорошо, это уже реализовано в python 3.2 или выше: https://docs.python.org/3/library/threading.html Ищите потоки. TIMEOUT_MAX

Но я улучшил тестовый пример по сравнению с версией frans ... хотя это уже пустая трата времени, если вы используете py3.2 или выше:

from unittest.mock import patch, Mock
import unittest

import os
import sys
import logging
import traceback
import threading
import time

from Util import ThreadingUtil

class ThreadingUtilTests(unittest.TestCase):

    def setUp(self):
        pass

    def tearDown(self):
        pass

    # https://www.pythoncentral.io/pythons-time-sleep-pause-wait-sleep-stop-your-code/
    def testTimeoutLock(self):

        faulted = [False, False, False]

        def locking_thread_fn(threadId, lock, duration, timeout):
            try:
                threadName = "Thread#" + str(threadId)
                with ThreadingUtil.TimeoutLock(threadName, lock, timeout=timeout, raise_on_timeout=True):
                    print('%x: "%s" begins to work..' % (threading.get_ident(), threadName))
                    time.sleep(duration)
                    print('%x: "%s" finished' % (threading.get_ident(), threadName))
            except:
                faulted[threadId] = True

        _lock = ThreadingUtil.TimeoutLock.lock()

        _sleepDuration = [5, 10, 1]
        _threads = []

        for i in range(3):
            _duration = _sleepDuration[i]
            _timeout = 6
            print("Wait duration (sec): " + str(_duration) + ", Timeout (sec): " + str(_timeout))
            _worker = threading.Thread(
                                        target=locking_thread_fn, 
                                        args=(i, _lock, _duration, _timeout)
                                    )
            _threads.append(_worker)

        for t in _threads: t.start()
        for t in _threads: t.join()

        self.assertEqual(faulted[0], False)
        self.assertEqual(faulted[1], False)
        self.assertEqual(faulted[2], True)

Теперь в папке «Util» у меня есть «ThreadingUtil.py»:

import time
import threading

# /8074241/kak-realizovat-blokirovku-s-taimautom-v-python-2-7
# https://docs.python.org/3.4/library/asyncio-sync.html#asyncio.Condition
# https://stackoverflow.com/questions/28664720/how-to-create-global-lock-semaphore-with-multiprocessing-pool-in-python
# https://hackernoon.com/synchronization-primitives-in-python-564f89fee732

class TimeoutLock(object):
    ''' taken from https://stackoverflow.com/a/8393033/1668622
    '''
    class lock:
        def __init__(self):
            self.owner = None
            self.lock = threading.Lock()
            self.cond = threading.Condition()

        def _release(self):
            self.owner = None
            self.lock.release()
            with self.cond:
                self.cond.notify()

    def __init__(self, owner, lock, timeout=1, raise_on_timeout=False):
        self._owner = owner
        self._lock = lock
        self._timeout = timeout
        self._raise_on_timeout = raise_on_timeout

    # http://effbot.org/zone/python-with-statement.htm
    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, type, value, tb):
        ''' will only be called if __enter__ did not raise '''
        self.release()

    def acquire(self):
        if self._raise_on_timeout:
            if not self._waitLock():
                raise RuntimeError('"%s" could not aquire lock within %d sec'
                                   % (self._owner, self._timeout))
        else:
            while True:
                if self._waitLock():
                    break
                print('"%s" is waiting for "%s" and is getting bored...'
                      % (self._owner, self._lock.owner))
        self._lock.owner = self._owner

    def release(self):
        self._lock._release()

    def _waitLock(self):
        with self._lock.cond:
            _current_t = _start_t = time.time()
            while _current_t < _start_t + self._timeout:
                if self._lock.lock.acquire(False):
                    return True
                else:
                    self._lock.cond.wait(self._timeout - _current_t + _start_t)
                    _current_t = time.time()
        return False
0 голосов
/ 14 августа 2015

Я взял SingleNegationElimination ответ и создал класс с, который можно использовать в with -статизме следующим образом:

global_lock = timeout_lock()
...

with timeout_lock(owner='task_name', lock=global_lock):
    do()
    some.stuff()

Таким образом, он будет только предупреждать , если истекло время ожидания (по умолчанию = 1 с), и покажет владельца блокировки для расследования.

Используйте его таким образом, и по истечении времени ожидания будет сгенерировано исключение:

with timeout_lock(owner='task_name', lock=global_lock, raise_on_timeout=True):
    do()
    some.stuff()

Экземпляр timeout_lock.lock() должен быть создан один раз и может использоваться в разных потоках.

Вот класс - он работает для меня, но не стесняйтесь комментировать и улучшать:

class timeout_lock:
    ''' taken from https://stackoverflow.com/a/8393033/1668622
    '''
    class lock:
        def __init__(self):
            self.owner = None
            self.lock = threading.Lock()
            self.cond = threading.Condition()

        def _release(self):
            self.owner = None
            self.lock.release()
            with self.cond:
                self.cond.notify()

    def __init__(self, owner, lock, timeout=1, raise_on_timeout=False):
        self._owner = owner
        self._lock = lock
        self._timeout = timeout
        self._raise_on_timeout = raise_on_timeout

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, type, value, tb):
        ''' will only be called if __enter__ did not raise '''
        self.release()

    def acquire(self):
        if self._raise_on_timeout:
            if not self._waitLock():
                raise RuntimeError('"%s" could not aquire lock within %d sec'
                                   % (self._owner, self._timeout))
        else:
            while True:
                if self._waitLock():
                    break
                print('"%s" is waiting for "%s" and is getting bored...'
                      % (self._owner, self._lock.owner))
        self._lock.owner = self._owner

    def release(self):
        self._lock._release()

    def _waitLock(self):
        with self._lock.cond:
            _current_t = _start_t = time.time()
            while _current_t < _start_t + self._timeout:
                if self._lock.lock.acquire(False):
                    return True
                else:
                    self._lock.cond.wait(self._timeout - _current_t + _start_t)
                    _current_t = time.time()
        return False

Чтобы быть уверенным, что потоки действительно не мешают и не ждут получения уведомлений как можно скорее, я написал небольшой тест многопоточности, который подытожит время, необходимое для запуска всех потоков:

def test_lock_guard():
    import random

    def locking_thread_fn(name, lock, duration, timeout):
        with timeout_lock(name, lock, timeout=timeout):
            print('%x: "%s" begins to work..' % (threading.get_ident(), name))
            time.sleep(duration)
            print('%x: "%s" finished' % (threading.get_ident(), name))

    _lock = timeout_lock.lock()

    _threads = []
    _total_d = 0
    for i in range(3):
        _d = random.random() * 3
        _to = random.random() * 2
        _threads.append(threading.Thread(
            target=locking_thread_fn, args=('thread%d' % i, _lock, _d, _to)))
        _total_d += _d

    _t = time.time()

    for t in _threads: t.start()
    for t in _threads: t.join()

    _t = time.time() - _t

    print('duration: %.2f sec / expected: %.2f (%.1f%%)'
          % (_t, _total_d, 100 / _total_d * _t))

Вывод:

7f940fc2d700: "thread0" begins to work..
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
7f940fc2d700: "thread0" finished
7f940f42c700: "thread1" begins to work..
"thread2" is waiting for "thread1" and is getting bored...
"thread2" is waiting for "thread1" and is getting bored...
7f940f42c700: "thread1" finished
"thread2" is waiting for "None" and is getting bored...
7f940ec2b700: "thread2" begins to work..
7f940ec2b700: "thread2" finished
duration: 5.20 sec / expected: 5.20 (100.1%)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...