Настройка: Python 3.7.3, Celery 4.3.0, RabbitMQ 3.7.16, Redis 5.0.4
Я пытаюсь создать приложение, которое показывает потоки холста. Я использую rabbitMQ в качестве посредника и Redis в качестве бэкэнда.
Рассмотрим следующий поток холста:
cf = add.si(0, 0) | group(add.si(1, 1), add.si(2, 2)) | add.si(3, 3)
res = c1.apply_async()
Запуск в режиме отладки и проверка res
, все выглядит нормально: res is AsyncResult
экземпляр последнего задания add.si(3, 3)
, у него нет дочерних элементов, а его родительский элемент GroupResult
экземпляр.
Если смотреть на GroupResult
дочерние элементы, также выглядит нормально, я вижу, что у него есть два дочерних элемента -group(add.si(1, 1)
и add.si(2, 2)
, а его родителем является AsyncResult
экземпляр add.si(0, 0)
.
Теперь я ожидал увидеть то же поведение, когда пытался получить объект AsyncResult
по task_id:
task_info = AsyncResult(<uuid of task add.si(3, 3)>, app=app)
Теперь я вижу, что task_info.parent
Нет. Почему это так?
Глядя дальше в Flower, я вижу task add.si(3, 3)
parent_id add.si(0, 0)
uuid - почему это так?
Я пытался реализовать прослушиватель цикла событий (аналогично Flower) и я не вижу задачу GroupResult
. Есть ли способ увидеть это? Это ошибка? это скрыто по замыслу? Я вижу в журналах Flower, что есть задача celery.group
. Есть ли способ разоблачить его?
[I 191107 19:48:36 command:147] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
Вот мой код:
import time
from celery import Celery, group
from celery.result import AsyncResult, GroupResult
def get_app():
return Celery(
'worker',
broker='pyamqp://localhost/vhost',
backend='redis://localhost/0'
)
app = get_app()
@app.task
def add(x, y):
time.sleep(1)
return x + y
cf = add.si(0, 0) | group(add.si(1, 1), add.si(2, 2)) | add.si(3, 3)
res = cf.apply_async()
time.sleep(5) # just in case - wait for the tasks to finish
print(type(res.parent)) # <class 'celery.result.GroupResult'>
print(len(res.parent.children)) # 2
print(type(res.parent.parent)) # <class 'celery.result.AsyncResult'>
print(res.parent.parent.result) # 0
same_res = AsyncResult(res.id, app=app)
print(type(same_res.parent)) # <class 'NoneType'>
# print(len(same_res.parent.children)) # 'NoneType' object has no attribute 'children'
# print(type(same_res.parent.parent)) # 'NoneType' object has no attribute 'parent'
# print(same_res.parent.parent.result) # 'NoneType' object has no attribute 'parent'
same_group_res = GroupResult(res.parent.id, app=app)
print(same_group_res.parent) # None
print(same_group_res.children) # None
Я открыл вопрос здесь .