Как остановить выполнение цепочки для невыполненной задачи в середине цепочки? - PullRequest
0 голосов
/ 11 февраля 2020

Вопрос

Как остановить выполнение цепочки для задачи, которая не выполняется где-то в середине цепочки?

Пример

@app.task
def ok(i):
    print(f"ok = {i}")
    return i + 1


@app.task
def fail(i):
    print(f"fail = {i}")
    raise RuntimeError(str(i))


if __name__ == "__main__":
    fails_in_the_middle = (ok.s(1) | fail.s() | ok.s())
    fails_in_the_end = (ok.s(1) | fail.s())

    for c in [fails_in_the_end, fails_in_the_middle]:
        resp = c.delay()
        try:
            resp.get(timeout=5)
        except RuntimeError:
            print("runtime error (as expected)")
        except celery.TimeoutError:
            # QUESTION: How do I make this never happen for `fails_in_the_middle`?
            print("timed out (not expected)")

Наблюдения

  • fails_in_the_middle цепочка: resp.get(timeout=2) всегда приводит к TimeoutError;
  • fails_in_the_end: всегда повышается RuntimeError(2) и не ждет тайм-аута.

Похоже, что AsyncResult.get() блокирует навсегда, если в цепочке есть незавершенные задачи, но очистка цепочки установкой self.request.chain = None не помогает, Цепочка по-прежнему блокируется до истечения времени ожидания.

Приведенное выше поведение присутствует, когда Redis используется в качестве посредника сообщений и хранилища результатов. Когда задачи помечены как активные (выполняемые в текущем процессе), поведение будет таким, как ожидалось.

Был бы благодарен за любые идеи. Спасибо!

1 Ответ

0 голосов
/ 12 февраля 2020

Получил ответ от группы пользователей сельдерея , надеюсь, это поможет кому-то еще. Все кредиты go на инж. Josue Balandrano Coronel.

Есть предостережение, когда вы получаете результат, как вы это делаете. Цепочка - это набор задач, связанных вместе, когда вы resp = c.delay() ставите в очередь все задачи в цепочке

Объект, который возвращает c.delay(), является не указателем на всю цепочку, а указателем на последнее задание в цепочке. Это означает, что resp заканчивается указателем на результат последней задачи в цепочке, в данном случае второй ok.s(). Это означает, что если промежуточное задание завершится неудачно, и вы попытаетесь выполнить resp.get(), то произойдет тайм-аут, поскольку задача, на которую указывает resp, так и не была выполнена.

Это также объясняет поведение, которое вы наблюдаете fails_in_the_end потому что resp - это указатель на fail.s(), когда вы делаете resp.get() Celery выдаст ошибку, которая произошла в задании. Именно так Celery обрабатывает доступ к результатам задач, которые привели к ошибкам, и в этой задаче нет обработки ошибок.

Теперь, в зависимости от того, что вы действительно хотите сделать, есть несколько различных вариантов, которые вы можете сделать. В порядке рекомендации.

1) Вы можете добавить обработчики успеха и ошибок в цепочку, чтобы убедиться, что вы не работаете с неполной цепочкой.

@app.task(bind=True)
def success(self, result):
    print(f"Chain result: ${result}")
    print(f"Chain: ${self.chain}")

@app.task(bind=True)
def error(self, *args, **kwargs)
    print(f"args: ${args)")
    print(f"kwargs: ${kwargs}")

if __name__ == "__main__":
    fails_in_the_middle = (ok.s(1) | fail.s() | ok.s() | success.s()).on_error(error.s())
    fails_in_the_end = (ok.s(1) | fail.s() | success.s()).on_error(error.s())

2) Результаты цепочки доступа в виде графа с использованием resp.collect(intermediate=True) collect() возвращают все результаты в цепочке в виде ориентированного ациклического графа (DAG). Обычно легче преобразовать это в список, чтобы вы могли l oop просмотреть каждый из результатов в следующем порядке:

resp = c.delay()
for result in list(resp.collect(intermediate=True)):
    print(result.get())

3) Пройдите граф цепочек назад, пока не получите к родителю и l oop через детей, чтобы получить результаты. Поскольку то, что вы получите в resp, фактически является последней задачей в цепочке, вы можете пройти график назад до первой задачи в цепочке и получить оттуда результаты:

resp = c.delay()
parent = resp.parent

while parent is not None:
    parent = resp.parent

for child in parent.children:
    child.get() 
...