отсутствует родитель и потомки в AsyncResult / GroupResult - PullRequest
0 голосов
/ 09 ноября 2019

Настройка: Python 3.7.3, Celery 4.3.0, RabbitMQ 3.7.16, Redis 5.0.4

Я пытаюсь создать приложение, которое показывает потоки холста. Я использую rabbitMQ в качестве посредника и Redis в качестве бэкэнда.

Рассмотрим следующий поток холста:

cf = add.si(0, 0) | group(add.si(1, 1), add.si(2, 2)) | add.si(3, 3)
res = c1.apply_async()

Запуск в режиме отладки и проверка res, все выглядит нормально: res is AsyncResult экземпляр последнего задания add.si(3, 3), у него нет дочерних элементов, а его родительский элемент GroupResult экземпляр.

Если смотреть на GroupResult дочерние элементы, также выглядит нормально, я вижу, что у него есть два дочерних элемента -group(add.si(1, 1) и add.si(2, 2), а его родителем является AsyncResult экземпляр add.si(0, 0).

Теперь я ожидал увидеть то же поведение, когда пытался получить объект AsyncResult по task_id:

task_info = AsyncResult(<uuid of task add.si(3, 3)>, app=app)

Теперь я вижу, что task_info.parent Нет. Почему это так?

Глядя дальше в Flower, я вижу task add.si(3, 3) parent_id add.si(0, 0) uuid - почему это так?

Я пытался реализовать прослушиватель цикла событий (аналогично Flower) и я не вижу задачу GroupResult. Есть ли способ увидеть это? Это ошибка? это скрыто по замыслу? Я вижу в журналах Flower, что есть задача celery.group. Есть ли способ разоблачить его?

[I 191107 19:48:36 command:147] Registered tasks:
    ['celery.accumulate',
      'celery.backend_cleanup',
      'celery.chain',
      'celery.chord',
      'celery.chord_unlock',
      'celery.chunks',
      'celery.group',
      'celery.map',
      'celery.starmap']

Вот мой код:

import time
from celery import Celery, group
from celery.result import AsyncResult, GroupResult


def get_app():
    return Celery(
        'worker',
        broker='pyamqp://localhost/vhost',
        backend='redis://localhost/0'
    )


app = get_app()


@app.task
def add(x, y):
    time.sleep(1)
    return x + y


cf = add.si(0, 0) | group(add.si(1, 1), add.si(2, 2)) | add.si(3, 3)
res = cf.apply_async()

time.sleep(5)  # just in case - wait for the tasks to finish

print(type(res.parent))             # <class 'celery.result.GroupResult'>
print(len(res.parent.children))     # 2
print(type(res.parent.parent))      # <class 'celery.result.AsyncResult'>
print(res.parent.parent.result)     # 0


same_res = AsyncResult(res.id, app=app)
print(type(same_res.parent))             # <class 'NoneType'>
# print(len(same_res.parent.children))     # 'NoneType' object has no attribute 'children'
# print(type(same_res.parent.parent))      # 'NoneType' object has no attribute 'parent'
# print(same_res.parent.parent.result)     # 'NoneType' object has no attribute 'parent'


same_group_res = GroupResult(res.parent.id, app=app)
print(same_group_res.parent)    # None
print(same_group_res.children)  # None

Я открыл вопрос здесь .

...