Получить все результаты цепочки задач по идентификатору отдельно в Celery - PullRequest
0 голосов
/ 06 июня 2018

Я пытаюсь получить результаты всех связанных задач в сельдерее, которые хранятся в бэкенде результатов mysql.

Например, у меня есть следующие две задачи сельдерея,

@celery.task(name='celery_fl.add')
def add(x, y, value=None):
    if value is None:
        try:
            return x + y
        except TypeError:
            return None
    return value

@celery.task(name='celery_fl.mul')
def mul(x, y, value=None):
    if value is None:
        try:
            return x * y
        except TypeError:
            return None
    return value

, и вот как я их объединяю,

parent = (add.s(2, 2) | mul.s(8)).apply_async()

Вот вывод parent.get() будет результатом окончательного связанного задания.parent.parent.get () выдаст мне вывод первой связанной задачи.

Я пытаюсь добиться того, чтобы я хотел получить такой же вывод, используя идентификатор задачи на последнем этапе,

task_id = 'bc5fc4b1-613e-4ef0-b5c8-900999d9a6f1'
parent = AsyncResult(task_id, app=celery)

скажем, что идентификатор_задачи, который у меня есть, принадлежит ко второму заданию в цепочечном событии (родитель).Тогда я должен получить результат первой связанной задачи, если я наберу parent.parent.get ().Но почему-то я получаю None в качестве значения.Есть ли другой способ получить задачу с помощью task_id вместо AsyncResult ()?

1 Ответ

0 голосов
/ 05 декабря 2018

При использовании бэкэнда mysql для сохранения результатов результаты каждой связанной задачи сохраняются отдельно.Но экземпляр задачи больше недоступен, и без этого невозможно получить результаты подзадач, используя основную задачу (Ref - Задачи Celery ).

Таким образом, чтобы получить результаты всех задач, идентификатор задачи каждой задачи должен храниться где-то в базе данных.

Пример с использованием колбы (python),

chain = (s3_init.s(order.name, order.id)|create_order_sheet.s(order.id, order.name) | create_order_info.s(order.id, order.name))
res = chain()
process = {
   's3_init': res.parent.parent.parent.parent.parent.parent.id,
   'order_sheet': res.parent.parent.parent.parent.id,
   'order_info': res.parent.parent.parent.id
}
order.update(process_id=json.dumps(process))

Затем вы можете просто получить идентификаторы задач из базы данных и использовать celery.result.AsyncResult (task_id) для извлечения каждой задачи по идентификатору (ref - Async results ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...