Вызов condition.wait () внутри потока приводит к блокированию поиска любого будущего в основном потоке. - PullRequest
13 голосов
/ 16 октября 2019

У меня есть задачи, которые выполняются в пуле потоков, которые разделяют блокировку чтения и записи. эти задачи возвращают фьючерсы, если выполнение завершается. Блокировка повторной записи чтения будет ожидать состояния, когда блокировка испытывает конфликт.

Используемая мной библиотека предоставляет метод wait_for_any для извлечения одного или нескольких законченных фьючерсов из коллекции задач. Однако, даже если один или несколько фьючерсов завершили, метод wait_for_any не сможет вернуться, пока все фьючерсы не будут завершены. Кроме того, метод wait_for_any предоставляет параметр времени ожидания, который впоследствии игнорируется, если он установлен.

Мой вопрос заключается в том, что я делаю неправильно, что может привести к блокировке такого метода wait_for_any? Понимаю ли я, что в Python реализовано условное ожидание и неправильно уведомляются, и будут ли эти конструкции полностью блокировать каждый поток в Python?

Используемая мной библиотека называется Futurist и поддерживается фондом OpenStack. Вот ссылки на соответствующие классы и методы, которые я использую: GreenThreadPoolExecutor и waiters.wait_for_any

Вот ReentrantReadWriteLock:

class ReentrantReadWriteLock(object):
    def __init__(self):

        self._read_lock = RLock()
        self._write_lock = RLock()
        self._condition = Condition
        self._num_readers = 0
        self._wants_write = False

    def read_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._read_lock.acquire(blocking):
                int_lock = True
                LOG.warning("read internal lock acquired")
                while self._wants_write:
                    LOG.warning("read wants write true")
                    if not blocking:
                        LOG.warning("read non blocking")
                        return False
                    LOG.warning("read wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("read acquired lock")
                self._num_readers += 1
                return True
            LOG.warning("read internal lock failed")
            return False
        finally:
            if int_lock:
                 self._read_lock.release()

    def write_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._write_lock.acquire(blocking):
                int_lock = True
                LOG.warning("write internal lock acquired")
                while self._num_readers > 0 or self._wants_write:
                    LOG.warning("write wants write true or num read")
                    if not blocking:
                        LOG.warning("write non blocking")
                        return False
                    LOG.warning("write wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("write acquired lock")
                self._wants_write = True
                return True
            LOG.warning("write internal lock failed")
            return False
        finally:
            if int_lock:
                self._write_lock.release()

Для проверкиБлокировка и блокировка на неопределенный срок Я делаю следующее:

def get_read(self, rrwlock):
    return rrwlock.read_acquire()

def get_write(self, rrwlock):
    return rrwlock.write_acquire()

def test():
    self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
    rrwlock = ReentrantReadWriteLock()
    futures = []
    futures.append(self._threadpool.submit(self.get_read, rrwlock))
    futures.append(self._threadpool.submit(self.get_write, rrwlock))

    # Get the results and verify only one of the calls succeeded
    # assert that the other call is still pending
    results = waiters.wait_for_any(futures)
    self.assertTrue(results[0].pop().result)
    self.assertEqual(1, len(results[1]))

В этом примере выполнение results = waiters.wait_for_any(futures) блокируется на неопределенный срок. Это сильно смущает меня. Я надеюсь, что кто-то может дать мне объяснение этого поведения.

Обновление 2019-10-16 18:55:00 UTC : блокировка основного потока не ограничивается этим ReentrantReadWriteLockреализации, но также происходит при использовании библиотек, таких как readerwriterlock .

Обновление 2019-10-17 08:15:00 UTC Я отправил это как отчет об ошибке вСопровождающие футуриста находятся на панели запуска, поскольку я считаю, что это поведение некорректно: Отчет об ошибках панели запуска

Обновление 2019-10-20 09:02:00 UTC IС тех пор я заметил, что вызов внутри футуристической библиотеки блокируется: waiter.event.wait (timeout) Похоже, что подобная проблема была передана в Python 3.3 и 3.4 и с тех пор закрыта: closedвыпуск

обновление 2019-10-21 09:06:00 UTC Исправлено исправление для библиотеки футуристов, чтобы попытаться решить эту проблему .

Обновление 2019-10-22 08:03:00 UTC Представленный патч не решил проблему. При трассировке waiter.event.wait(timeout) блоки вызовов в функции ожидания Python threading.py при вызове waiter.acquire () .

Обновление 2019-10-23 07:17:00 UTC Я создал небольшой репозиторий , который демонстрирует, что это возможно с помощью встроенного ThreadPoolExecutor и futures. Я начинаю подозревать, что это ограничение в CPython, вызванное GIL. Следующий код демонстрирует работу демонстрации с использованием той же блокировки, как показано выше:

from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor

def read_lock(lock):
    lock.read_acquire()

def write_lock(lock):
    lock.write_acquire()

def main():
    local_lock = ReentrantReadWriteLock()
    with ThreadPoolExecutor(max_workers=2) as executor:
        # First task will submit fine
        future = executor.submit(read_lock, local_lock)
        # Second one will block indefinitely
        future2 = executor.submit(write_lock, local_lock)

Обновление 2019-10-31 07:36:00 UTC Блокировка чтения при повторной регистрации имеетбыл обновлен так, что он работает с Python 2.7 и соответствует тому, что написано в демо-репозитории на github .

Кроме того, было обнаружено, что демонстрационная версия родного пула потоков, как описано в 2019-10-23 не работает, потому что вместе с последним оператором

future2 = executor.submit(write_lock, local_lock)

Будет вызван метод __exit__ пула потоков. Естественно, этот метод пытается полностью закрыть все запущенные в данный момент потоки, что невозможно из-за удерживаемой блокировки. Пример был обновлен с помощью spin_for_any примера:

futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))

# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
    for f in futures:
        if f.done():
            futures.remove(f)
            f.result()
            print("Future done")

Этот собственный пример Python для параллелизма spin_for_any работает полностью, как и ожидалось.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...