Обработка исключений в сельдерее? - PullRequest
2 голосов
/ 08 января 2020

У меня есть группа задач, каждая из которых выполняет запрос к стандартной конечной точке API oauth и зависит от bearer_token . Задачи вызовут исключение во время обработки ответа, если срок действия bearer_token истек. Также существует задача refresh_bearer_token, которая обрабатывает обновление токена после его истечения.

Вот псевдокод для этого:

from proj.celery import app

bearer_token = '1234'

class OauthError(Exception):
    pass

@app.task
def api_request():
    response = request(bearer_token, ...)
    if response.bearer_token_expired:
        raise OauthError('oauth')

@app.task
def refresh_bearer_token():
    ...

Как мне запланировать refresh_bearer_token задание для выполнения всякий раз, когда поднимается OauthError?

Единственное решение, которое я могу найти, это использование link_error kwarg следующим образом:

@app.task
def error_callback(uuid):
    exception_msg = AsyncResult(uuid).get(propagate=False, disable_sync_subtasks=False)
    if exception_msg = 'oauth':
        refresh_bearer_token.delay()
    else:
        raise 

api_request.apply_async(link_error=error_callback.s())

Но это кажется неоптимальным для некоторых причины, в первую очередь потому, что она порождает синхронную дочернюю задачу в другой синхронной дочерней задаче, которая настоятельно игнорирует в документах.

Есть ли еще pythoni c способ ловли исключений в сельдерее?

Например:

def catch(func_that_requires_oauth):
    try:
        func_that_requires_oauth.delay()
    except OauthError:
        refresh_bearer_token.delay() | func_that_requires_oauth.delay()

1 Ответ

2 голосов
/ 08 января 2020

Просто подбрасываю идеи. Вы можете создать базовую задачу, которая ожидает или повторяет попытку, если блокировка была получена задачей refresh_bearer_token при каждом ее вызове. В случае сбоя он запускает задачу refresh_bearer_token и также удаляет себя.

При повторной попытке копия выполняющейся задачи помещается в конец очереди

Теперь вам нужно будет реализовать какой-то вид блокировка, если блокировка уже получена, refresh_bearer_token ничего не делает, так как другая задача должна обновлять ее. Вам также необходимо добавить TTL к этой «блокировке», чтобы предотвратить некоторые условия, когда задача refresh_bearer_token не выполняется

@app.task
def refresh_bearer_token():
    try:
        with aquire_lock(timeout=0):
            refresh_token()
    except TimeoutError:
        pass


class RequiresOauthTask(app.Task):
    abstract = True

    def __call__(self, *args, **kwargs):
        if lock_is_present():
            self.retry()  # or wait?
        return super().__call__(*args, **kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        if isinstance(exc, OauthError):
            refresh_bearer_token.delay()
            self.retry()
        super().on_failure(exc, task_id, args, kwargs, einfo)


@app.task(base=RequiresOauthTask)
def my_task():
    pass
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...