Поддерживает ли Python asyncio lock.acquire порядок? - PullRequest
1 голос
/ 02 мая 2019

Если у меня две функции, выполняющие

async with mylock.acquire():
    ....

Как только блокировка снята, гарантированно, что победит ожидающий первый, или порядок выбран по-другому?(например, случайным образом, произвольно, в последнюю очередь и т.получить замок никогда не выигрывает его.

Ответы [ 2 ]

2 голосов
/ 02 мая 2019

Когда мы говорим о том, как что-то работает, важно различать гарантию, выраженную в спецификации, и побочный эффект от реализации.Первое не должно быть изменено (по крайней мере, в основной версии), второе может быть изменено в любое время в будущем.

Ответ Мартина ясно показывает, что текущая реализация сохраняет порядок.Как насчет гарантии на будущее?

Официальная документация для Python 3.6 предоставляет гарантию:

только одна сопрограмма обрабатывается, когда вызов release () сбрасывает состояние до разблокированного; обрабатывается первая сопрограмма, заблокированная в acqu () .

Интересно, что ни документация для Python 3.7 , ни документация для Python3.8 dev есть эта строка, хотя не уверен, что она намеренная.Однако строка документации класса на github имеет гарантию .

Стоит также отметить, что threading.Lock (прототип для блокировки asyncio) явно говорит, что порядок не определен:

только один поток продолжает работу, когда вызов release () сбрасывает состояние до разблокированного; какой из ожидающих потоков продолжается, не определено и может варьироваться в зависимости от реализации.


Короче говоря, сейчас только строка документации класса обещает поддерживать порядок.Также справедливо отметить, что реализация блокировки вряд ли будет изменена в ближайшем будущем.

Однако представьте, однако, кто-то изменит ее (например, для повышения производительности).Будет ли достаточно документации, чтобы предотвратить реализацию блокировки с неопределенным порядком?Вам решать.

Если ваш код критически зависит от сохранения порядка и ожидается, что с долгим жизненным циклом ничего плохого не будет, если вы создадите свой собственный класс блокировки (под), который явно гарантирует порядок (OrderedLock иличто-то).Вы можете просто реализовать текущую реализацию.

Если ситуация проще, вы можете не беспокоиться об этом и использовать текущую реализацию.

1 голос
/ 02 мая 2019

Да, задачи, ожидающие блокировки, добавляются в очередь и просыпаются по принципу FIFO.

В частности, при попытке получить заблокированную блокировку future создается, который ждет сигнала о том, что блокировка стала доступной, и называется официант .Этот официант добавляется в collections.deque() двустороннюю очередь, , созданную в Lock.__init__()

self._waiters = collections.deque()

Когда блокировка освобождается задачей, удерживающей ее в данный момент, Lock._wake_up_first() метод вызывается:

def _wake_up_first(self):
    """Wake up the first waiter if it isn't done."""
    try:
        fut = next(iter(self._waiters))
    except StopIteration:
        return


    # .done() necessarily means that a waiter will wake up later on and
    # either take the lock, or, if it was cancelled and lock wasn't
    # taken already, will hit this again and wake up a new waiter.
    if not fut.done():
        fut.set_result(True)

Future.set_result() вызов отмечает будущее как выполненное.То, как именно это приводит к ожиданию в будущем задачи по восстановлению управления, зависит от реализации, но обычно это делается с помощью функции обратного вызова, переданной циклу событий для вызова при первой возможности.

Lock.acquire() метод отвечает как за добавление, так и за удаление фьючерсов (поскольку именно к этому вернется будущее, когда будет указано, что результат установлен):

fut = self._loop.create_future()
self._waiters.append(fut)

# Finally block should be called before the CancelledError
# handling as we don't want CancelledError to call
# _wake_up_first() and attempt to wake up itself.
try:
    try:
        await fut
    finally:
        self._waiters.remove(fut)
except futures.CancelledError:
    if not self._locked:
        self._wake_up_first()
    raise

Так что, если блокировка заблокирована, текущийЗадача состоит в том, чтобы ждать, создав объект будущего, который добавляется в очередь _waiters, и будущее ожидается.Это блокирует задачу, пока у будущего не будет результата (await fut не вернется до тех пор).Цикл обработки событий не даст этой задаче никакого времени обработки.

Другая задача, которая в настоящее время удерживает блокировку и снимает ее, вызовет первое (самое долгое ожидание) будущее из очереди _waiters с косвенным набором результатов.заставляя задачу, которая ждет этого будущего, снова стать активной.Когда задача освобождения блокировки возвращает управление циклу событий (при ожидании чего-то еще), цикл событий передает управление задаче, ожидающей этого будущего, будущее возвращается к строке await fut, будущее удаляется изочередь и блокировка дается задаче, которая ждала этого будущего.

Здесь есть один случай состояния гонки, который явно обрабатывает метод Lock.acquire():

  1. Задача A освобождаетблокировка, очередь содержит будущее для задачи B, ожидающей блокировки.Будущее установлено как выполненное.
  2. Цикл обработки событий передает управление третьей задаче C, которая ожидала чего-то неотвязанного, но теперь снова активна, и эта задача запускает код, который пытается получить блокировку.

Задача C не будет при условии блокировки, однако, потому что в верхней части Lock.acquire() метода находится этот тест:

if not self._locked and all(w.cancelled() for w in self._waiters):
    self._locked = True
    return True

not self._lockedверно в его случае, так как задача А выпустила его.Но all(w.cancelled() for w in self._waiters) нет, поскольку задача B имеет активное, неотменяемое будущее в очереди.Таким образом, задача C создана для добавления собственного будущего официанта в очередь.Разблокированная блокировка с активными фьючерсами в очереди _waiters фактически считается заблокированной.

...