Я понятия не имею, называется ли такая блокировка временной блокировкой, но мне нужно что-то для следующего сценария: я делаю много одновременных запросов с aiohttp
, и возможно, что сервер в какой-то момент вернется429 Too Many Requests
.В этом случае мне придется приостановить все последующие запросы на некоторое время.
Я нашел следующее решение:
import asyncio
class TimeLock:
def __init__(self, *, loop=None):
self._locked = False
self._locked_at = None
self._time_lock = None
self._unlock_task = None
self._num_waiters = 0
if loop is not None:
self._loop = loop
else:
self._loop = asyncio.get_event_loop()
def __repr__(self):
state = f'locked at {self.locked_at}' if self._locked else 'unlocked'
return f'[{state}] {self._num_waiters} waiters'
@property
def locked(self):
return self._locked
@property
def locked_at(self):
return self._locked_at
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
# in this time lock there is nothing to do when it's released
return
async def acquire(self):
if not self._locked:
return True
try:
print('waiting for lock to be released')
self._num_waiters += 1
await self._time_lock
self._num_waiters -= 1
print('done, returning now')
except asyncio.CancelledError:
if self._locked:
raise
return True
def lock_for(self, delay, lock_more=False):
print(f'locking for {delay}')
if self._locked:
if not lock_more:
# if we don't want to increase the lock time, we just exit when
# the lock is already in a locked state
print('already locked, nothing to do')
return
print('already locked, but canceling old unlock task')
self._unlock_task.cancel()
self._locked = True
self._locked_at = time.time()
self._time_lock = self._loop.create_future()
self._unlock_task = self._loop.create_task(self.unlock_in(delay))
print('locked')
async def unlock_in(self, delay):
print('unlocking started')
await asyncio.sleep(delay)
self._locked = False
self._locked_at = None
self._unlock_task = None
self._time_lock.set_result(True)
print('unlocked')
Я тестирую блокировку с помощью этого кода:
import asyncio
from ares.http import TimeLock
async def run(lock, i):
async with lock:
print(lock)
print(i)
if i in (3, 6, 9):
lock.lock_for(2)
if __name__ == '__main__':
lock = TimeLock()
tasks = []
loop = asyncio.get_event_loop()
for i in range(10):
tasks.append(run(lock, i))
loop.run_until_complete(asyncio.gather(*tasks))
print(lock)
Код производит следующий вывод, который, кажется, согласуется с тем, что я хочу из приведенного выше сценария:
[unlocked] 0 waiters
0
[unlocked] 0 waiters
1
[unlocked] 0 waiters
2
[unlocked] 0 waiters
3
locking for 2
locked
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
unlocking started
unlocked
done, returning now
[unlocked] 5 waiters
4
done, returning now
[unlocked] 4 waiters
5
done, returning now
[unlocked] 3 waiters
6
locking for 2
locked
done, returning now
[locked at 1559496296.7109463] 2 waiters
7
done, returning now
[locked at 1559496296.7109463] 1 waiters
8
done, returning now
[locked at 1559496296.7109463] 0 waiters
9
locking for 2
already locked, nothing to do
unlocking started
[locked at 1559496296.7109463] 0 waiters
Это правильный способ реализации этого примитива синхронизации?Я также не уверен насчет безопасности этого кода.У меня нет большого опыта работы с потоками и асинхронным кодом.