Получил ответ от группы пользователей сельдерея , надеюсь, это поможет кому-то еще. Все кредиты go на инж. Josue Balandrano Coronel.
Есть предостережение, когда вы получаете результат, как вы это делаете. Цепочка - это набор задач, связанных вместе, когда вы resp = c.delay()
ставите в очередь все задачи в цепочке
Объект, который возвращает c.delay()
, является не указателем на всю цепочку, а указателем на последнее задание в цепочке. Это означает, что resp
заканчивается указателем на результат последней задачи в цепочке, в данном случае второй ok.s()
. Это означает, что если промежуточное задание завершится неудачно, и вы попытаетесь выполнить resp.get()
, то произойдет тайм-аут, поскольку задача, на которую указывает resp
, так и не была выполнена.
Это также объясняет поведение, которое вы наблюдаете fails_in_the_end
потому что resp
- это указатель на fail.s()
, когда вы делаете resp.get()
Celery выдаст ошибку, которая произошла в задании. Именно так Celery обрабатывает доступ к результатам задач, которые привели к ошибкам, и в этой задаче нет обработки ошибок.
Теперь, в зависимости от того, что вы действительно хотите сделать, есть несколько различных вариантов, которые вы можете сделать. В порядке рекомендации.
1) Вы можете добавить обработчики успеха и ошибок в цепочку, чтобы убедиться, что вы не работаете с неполной цепочкой.
@app.task(bind=True)
def success(self, result):
print(f"Chain result: ${result}")
print(f"Chain: ${self.chain}")
@app.task(bind=True)
def error(self, *args, **kwargs)
print(f"args: ${args)")
print(f"kwargs: ${kwargs}")
if __name__ == "__main__":
fails_in_the_middle = (ok.s(1) | fail.s() | ok.s() | success.s()).on_error(error.s())
fails_in_the_end = (ok.s(1) | fail.s() | success.s()).on_error(error.s())
2) Результаты цепочки доступа в виде графа с использованием resp.collect(intermediate=True)
collect()
возвращают все результаты в цепочке в виде ориентированного ациклического графа (DAG). Обычно легче преобразовать это в список, чтобы вы могли l oop просмотреть каждый из результатов в следующем порядке:
resp = c.delay()
for result in list(resp.collect(intermediate=True)):
print(result.get())
3) Пройдите граф цепочек назад, пока не получите к родителю и l oop через детей, чтобы получить результаты. Поскольку то, что вы получите в resp
, фактически является последней задачей в цепочке, вы можете пройти график назад до первой задачи в цепочке и получить оттуда результаты:
resp = c.delay()
parent = resp.parent
while parent is not None:
parent = resp.parent
for child in parent.children:
child.get()