Просто подбрасываю идеи. Вы можете создать базовую задачу, которая ожидает или повторяет попытку, если блокировка была получена задачей refresh_bearer_token при каждом ее вызове. В случае сбоя он запускает задачу refresh_bearer_token и также удаляет себя.
При повторной попытке копия выполняющейся задачи помещается в конец очереди
Теперь вам нужно будет реализовать какой-то вид блокировка, если блокировка уже получена, refresh_bearer_token ничего не делает, так как другая задача должна обновлять ее. Вам также необходимо добавить TTL к этой «блокировке», чтобы предотвратить некоторые условия, когда задача refresh_bearer_token
не выполняется
@app.task
def refresh_bearer_token():
try:
with aquire_lock(timeout=0):
refresh_token()
except TimeoutError:
pass
class RequiresOauthTask(app.Task):
abstract = True
def __call__(self, *args, **kwargs):
if lock_is_present():
self.retry() # or wait?
return super().__call__(*args, **kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, OauthError):
refresh_bearer_token.delay()
self.retry()
super().on_failure(exc, task_id, args, kwargs, einfo)
@app.task(base=RequiresOauthTask)
def my_task():
pass