У меня есть задачи, которые выполняются в пуле потоков, которые разделяют блокировку чтения и записи. эти задачи возвращают фьючерсы, если выполнение завершается. Блокировка повторной записи чтения будет ожидать состояния, когда блокировка испытывает конфликт.
Используемая мной библиотека предоставляет метод wait_for_any для извлечения одного или нескольких законченных фьючерсов из коллекции задач. Однако, даже если один или несколько фьючерсов завершили, метод wait_for_any не сможет вернуться, пока все фьючерсы не будут завершены. Кроме того, метод wait_for_any предоставляет параметр времени ожидания, который впоследствии игнорируется, если он установлен.
Мой вопрос заключается в том, что я делаю неправильно, что может привести к блокировке такого метода wait_for_any? Понимаю ли я, что в Python реализовано условное ожидание и неправильно уведомляются, и будут ли эти конструкции полностью блокировать каждый поток в Python?
Используемая мной библиотека называется Futurist и поддерживается фондом OpenStack. Вот ссылки на соответствующие классы и методы, которые я использую: GreenThreadPoolExecutor и waiters.wait_for_any
Вот ReentrantReadWriteLock:
class ReentrantReadWriteLock(object):
def __init__(self):
self._read_lock = RLock()
self._write_lock = RLock()
self._condition = Condition
self._num_readers = 0
self._wants_write = False
def read_acquire(self, blocking=True):
int_lock = False
try:
if self._read_lock.acquire(blocking):
int_lock = True
LOG.warning("read internal lock acquired")
while self._wants_write:
LOG.warning("read wants write true")
if not blocking:
LOG.warning("read non blocking")
return False
LOG.warning("read wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("read acquired lock")
self._num_readers += 1
return True
LOG.warning("read internal lock failed")
return False
finally:
if int_lock:
self._read_lock.release()
def write_acquire(self, blocking=True):
int_lock = False
try:
if self._write_lock.acquire(blocking):
int_lock = True
LOG.warning("write internal lock acquired")
while self._num_readers > 0 or self._wants_write:
LOG.warning("write wants write true or num read")
if not blocking:
LOG.warning("write non blocking")
return False
LOG.warning("write wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("write acquired lock")
self._wants_write = True
return True
LOG.warning("write internal lock failed")
return False
finally:
if int_lock:
self._write_lock.release()
Для проверкиБлокировка и блокировка на неопределенный срок Я делаю следующее:
def get_read(self, rrwlock):
return rrwlock.read_acquire()
def get_write(self, rrwlock):
return rrwlock.write_acquire()
def test():
self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
rrwlock = ReentrantReadWriteLock()
futures = []
futures.append(self._threadpool.submit(self.get_read, rrwlock))
futures.append(self._threadpool.submit(self.get_write, rrwlock))
# Get the results and verify only one of the calls succeeded
# assert that the other call is still pending
results = waiters.wait_for_any(futures)
self.assertTrue(results[0].pop().result)
self.assertEqual(1, len(results[1]))
В этом примере выполнение results = waiters.wait_for_any(futures)
блокируется на неопределенный срок. Это сильно смущает меня. Я надеюсь, что кто-то может дать мне объяснение этого поведения.
Обновление 2019-10-16 18:55:00 UTC : блокировка основного потока не ограничивается этим ReentrantReadWriteLockреализации, но также происходит при использовании библиотек, таких как readerwriterlock .
Обновление 2019-10-17 08:15:00 UTC Я отправил это как отчет об ошибке вСопровождающие футуриста находятся на панели запуска, поскольку я считаю, что это поведение некорректно: Отчет об ошибках панели запуска
Обновление 2019-10-20 09:02:00 UTC IС тех пор я заметил, что вызов внутри футуристической библиотеки блокируется: waiter.event.wait (timeout) Похоже, что подобная проблема была передана в Python 3.3 и 3.4 и с тех пор закрыта: closedвыпуск
обновление 2019-10-21 09:06:00 UTC Исправлено исправление для библиотеки футуристов, чтобы попытаться решить эту проблему .
Обновление 2019-10-22 08:03:00 UTC Представленный патч не решил проблему. При трассировке waiter.event.wait(timeout)
блоки вызовов в функции ожидания Python threading.py при вызове waiter.acquire () .
Обновление 2019-10-23 07:17:00 UTC Я создал небольшой репозиторий , который демонстрирует, что это возможно с помощью встроенного ThreadPoolExecutor и futures. Я начинаю подозревать, что это ограничение в CPython, вызванное GIL. Следующий код демонстрирует работу демонстрации с использованием той же блокировки, как показано выше:
from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor
def read_lock(lock):
lock.read_acquire()
def write_lock(lock):
lock.write_acquire()
def main():
local_lock = ReentrantReadWriteLock()
with ThreadPoolExecutor(max_workers=2) as executor:
# First task will submit fine
future = executor.submit(read_lock, local_lock)
# Second one will block indefinitely
future2 = executor.submit(write_lock, local_lock)
Обновление 2019-10-31 07:36:00 UTC Блокировка чтения при повторной регистрации имеетбыл обновлен так, что он работает с Python 2.7 и соответствует тому, что написано в демо-репозитории на github .
Кроме того, было обнаружено, что демонстрационная версия родного пула потоков, как описано в 2019-10-23 не работает, потому что вместе с последним оператором
future2 = executor.submit(write_lock, local_lock)
Будет вызван метод __exit__
пула потоков. Естественно, этот метод пытается полностью закрыть все запущенные в данный момент потоки, что невозможно из-за удерживаемой блокировки. Пример был обновлен с помощью spin_for_any примера:
futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))
# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
for f in futures:
if f.done():
futures.remove(f)
f.result()
print("Future done")
Этот собственный пример Python для параллелизма spin_for_any работает полностью, как и ожидалось.