Да, задачи, ожидающие блокировки, добавляются в очередь и просыпаются по принципу FIFO.
В частности, при попытке получить заблокированную блокировку future создается, который ждет сигнала о том, что блокировка стала доступной, и называется официант .Этот официант добавляется в collections.deque()
двустороннюю очередь, , созданную в Lock.__init__()
self._waiters = collections.deque()
Когда блокировка освобождается задачей, удерживающей ее в данный момент, Lock._wake_up_first()
метод вызывается:
def _wake_up_first(self):
"""Wake up the first waiter if it isn't done."""
try:
fut = next(iter(self._waiters))
except StopIteration:
return
# .done() necessarily means that a waiter will wake up later on and
# either take the lock, or, if it was cancelled and lock wasn't
# taken already, will hit this again and wake up a new waiter.
if not fut.done():
fut.set_result(True)
Future.set_result()
вызов отмечает будущее как выполненное.То, как именно это приводит к ожиданию в будущем задачи по восстановлению управления, зависит от реализации, но обычно это делается с помощью функции обратного вызова, переданной циклу событий для вызова при первой возможности.
Lock.acquire()
метод отвечает как за добавление, так и за удаление фьючерсов (поскольку именно к этому вернется будущее, когда будет указано, что результат установлен):
fut = self._loop.create_future()
self._waiters.append(fut)
# Finally block should be called before the CancelledError
# handling as we don't want CancelledError to call
# _wake_up_first() and attempt to wake up itself.
try:
try:
await fut
finally:
self._waiters.remove(fut)
except futures.CancelledError:
if not self._locked:
self._wake_up_first()
raise
Так что, если блокировка заблокирована, текущийЗадача состоит в том, чтобы ждать, создав объект будущего, который добавляется в очередь _waiters
, и будущее ожидается.Это блокирует задачу, пока у будущего не будет результата (await fut
не вернется до тех пор).Цикл обработки событий не даст этой задаче никакого времени обработки.
Другая задача, которая в настоящее время удерживает блокировку и снимает ее, вызовет первое (самое долгое ожидание) будущее из очереди _waiters
с косвенным набором результатов.заставляя задачу, которая ждет этого будущего, снова стать активной.Когда задача освобождения блокировки возвращает управление циклу событий (при ожидании чего-то еще), цикл событий передает управление задаче, ожидающей этого будущего, будущее возвращается к строке await fut
, будущее удаляется изочередь и блокировка дается задаче, которая ждала этого будущего.
Здесь есть один случай состояния гонки, который явно обрабатывает метод Lock.acquire()
:
- Задача A освобождаетблокировка, очередь содержит будущее для задачи B, ожидающей блокировки.Будущее установлено как выполненное.
- Цикл обработки событий передает управление третьей задаче C, которая ожидала чего-то неотвязанного, но теперь снова активна, и эта задача запускает код, который пытается получить блокировку.
Задача C не будет при условии блокировки, однако, потому что в верхней части Lock.acquire()
метода находится этот тест:
if not self._locked and all(w.cancelled() for w in self._waiters):
self._locked = True
return True
not self._locked
верно в его случае, так как задача А выпустила его.Но all(w.cancelled() for w in self._waiters)
нет, поскольку задача B имеет активное, неотменяемое будущее в очереди.Таким образом, задача C создана для добавления собственного будущего официанта в очередь.Разблокированная блокировка с активными фьючерсами в очереди _waiters
фактически считается заблокированной.