Контекст
Я создал приложение Django, которое вызывает задачу сельдерея, которая, в свою очередь, порождает другие задачи и ожидает их завершения.
Вотрабочий процесс:
1) Основной код python / django запускает задачу сельдерея в фоновом режиме
2) Задача сельдерея обрабатывает некоторый код, а затем запускает группу разных задач сельдерея и ждетони должны быть готовы
3) затем каждое задание группы порождает другую группу подзадач таким же образом и ждет, пока они завершат
Это работает хорошо (хотя я начинающий ивероятно, реализовал это плохо) но теперь я хотел бы иметь возможность завершать все дочерние процессы, если я убиваю основные задачи сельдерея, начатые в начале.
Что у меня есть до сих пор
Я воссоздаю ситуацию, используя простые родительские задачи, порождающие несколько дочерних задач, и я изменил метод on_failure класса задачи сельдерея, чтобы убить его дочерний элемент в случае сбоя.
Tasks.py
from celery import Celery, group,Task, result
from celery.signals import task_revoked
import time
from pprint import pprint
application = Celery('tasks',backend='amqp://',broker='amqp://guest@localhost//')
class MyTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(self.AsyncResult(task_id).children[0].revoke(terminate=True,signal='SIGTERM'))
print('{0!r} failed: {1!r}'.format(task_id, exc))
@application.task(base=MyTask)
def childTask():
while True:
time.sleep(10)
print("Message de la tache enfant")
continue
@application.task(base=MyTask)
def parentTask(pra_id = None):
child_tasks = []
print("Lancement tache mère")
child_tasks.append(childTask.s())
child_tasks.append(childTask.s())
child_tasks.append(childTask.s())
tasks = group(child_tasks)
tasks.apply_async()
time.sleep(15)
raise KeyError
main.py
from tasks import parentTask
parent1 = parentTask.delay(pra_id = 10)
parent2 = parentTask.delay(pra_id = 20)
Когда код вызывает ошибку, родительская задача успешно завершается иего дочерние задачи тоже, это то, что я хочу.
Что мне нужно
Мне нужно иметь возможность вручную убить мою родительскую задачу из моего приложения django.
Это делается путем проверки работника сельдерея и поиска моей задачи путем поиска его аргументов, однако это было успешно сделано, однако, когда я вручную отменяю задачу сельдерея, как только я ее обнаружил, она не завершает дочерние задачи, порожденныеэто задание и вот что мне нужно.
То, что я пробовал до сих пор
Я пытался создать функцию, запускаемую сигналом "revoke"
(http://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked)
, который будет выполняться при отмене задачи.
Захват сигнала работал (я смог выполнить некоторый код при отзыве задачи), ноЯ не смог использовать тот же код, что и мой метод "On_failure", описанный выше), чтобы убить тОн выполняет дочерние задачи.
Задача
Объект запроса, отправляемый в функцию, содержит мою родительскую задачу, но свойство "children" класса пустое, если оно должно содержатьобъект GroupResult, содержащий дочерние задачи.