Python / Celery: как убить подзадачи при уничтожении родительской задачи? - PullRequest
0 голосов
/ 19 декабря 2018

Контекст

Я создал приложение Django, которое вызывает задачу сельдерея, которая, в свою очередь, порождает другие задачи и ожидает их завершения.

Вотрабочий процесс:

1) Основной код python / django запускает задачу сельдерея в фоновом режиме

2) Задача сельдерея обрабатывает некоторый код, а затем запускает группу разных задач сельдерея и ждетони должны быть готовы

3) затем каждое задание группы порождает другую группу подзадач таким же образом и ждет, пока они завершат

Это работает хорошо (хотя я начинающий ивероятно, реализовал это плохо) но теперь я хотел бы иметь возможность завершать все дочерние процессы, если я убиваю основные задачи сельдерея, начатые в начале.

Что у меня есть до сих пор

Я воссоздаю ситуацию, используя простые родительские задачи, порождающие несколько дочерних задач, и я изменил метод on_failure класса задачи сельдерея, чтобы убить его дочерний элемент в случае сбоя.

Tasks.py

from celery import Celery, group,Task, result
from celery.signals import task_revoked
import time
from pprint import pprint
application = Celery('tasks',backend='amqp://',broker='amqp://guest@localhost//')


class MyTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(self.AsyncResult(task_id).children[0].revoke(terminate=True,signal='SIGTERM'))
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@application.task(base=MyTask)
def childTask():
    while True:
        time.sleep(10)
        print("Message de la tache enfant")
        continue

@application.task(base=MyTask)
def parentTask(pra_id = None):
    child_tasks = []
    print("Lancement tache mère")
    child_tasks.append(childTask.s())
    child_tasks.append(childTask.s())
    child_tasks.append(childTask.s())
    tasks = group(child_tasks)
    tasks.apply_async()

    time.sleep(15)
    raise KeyError

main.py

from tasks import parentTask

parent1 = parentTask.delay(pra_id = 10)
parent2 = parentTask.delay(pra_id = 20)

Когда код вызывает ошибку, родительская задача успешно завершается иего дочерние задачи тоже, это то, что я хочу.

Что мне нужно

Мне нужно иметь возможность вручную убить мою родительскую задачу из моего приложения django.

Это делается путем проверки работника сельдерея и поиска моей задачи путем поиска его аргументов, однако это было успешно сделано, однако, когда я вручную отменяю задачу сельдерея, как только я ее обнаружил, она не завершает дочерние задачи, порожденныеэто задание и вот что мне нужно.

То, что я пробовал до сих пор

Я пытался создать функцию, запускаемую сигналом "revoke"

(http://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked)

, который будет выполняться при отмене задачи.

Захват сигнала работал (я смог выполнить некоторый код при отзыве задачи), ноЯ не смог использовать тот же код, что и мой метод "On_failure", описанный выше), чтобы убить тОн выполняет дочерние задачи.

Задача

Объект запроса, отправляемый в функцию, содержит мою родительскую задачу, но свойство "children" класса пустое, если оно должно содержатьобъект GroupResult, содержащий дочерние задачи.

...