Задача сельдерея, которая запускает больше задач - PullRequest
45 голосов
/ 15 июня 2011

Я использую сельдерея для запуска основной задачи, которая запускает ряд дополнительных задач. У меня есть обе задачи уже написаны.

Есть ли способ легко сделать это? Позволяет ли Celery запускать задачи изнутри?

Мой пример:

@task
def compute(users=None):
    if users is None:
        users = User.objects.all()

    tasks = []
    for user in users:
        tasks.append(compute_for_user.subtask((user.id,)))

    job = TaskSet(tasks)
    job.apply_async() # raises a IOError: Socket closed

@task
def compute_for_user(user_id):
    #do some stuff

compute вызывается из сельдерея, но вызывает IOError при попытке запустить apply_async. Есть идеи?

Ответы [ 4 ]

29 голосов
/ 20 июня 2011

Чтобы ответить на ваши начальные вопросы: Начиная с версии 2.0, Celery предоставляет простой способ запуска задач из других задач. То, что вы называете «второстепенными задачами», это то, что называется «подзадачами». См. Документацию для Наборов задач, подзадач и обратных вызовов , на которые @Paperino был достаточно любезен для ссылки.

Для версии 3.0, Celery изменен на использование групп для этого и других типов поведения.

Ваш код показывает, что вы уже знакомы с этим интерфейсом. Похоже, что ваш настоящий вопрос звучит так: «Почему я получаю« Socket Closed »IOError, когда пытаюсь запустить свой набор подзадач?» Я не думаю, что кто-то может ответить на этот вопрос, потому что вы не предоставили достаточно информации о своей программе. Ваш отрывок не может быть запущен как есть, поэтому мы не можем исследовать проблему, с которой вы столкнулись. Пожалуйста, опубликуйте трассировку стека, поставляемую с IOError, и, если вам повезет, придет кто-то, кто может помочь вам с вашим сбоем.

9 голосов
/ 21 октября 2013

Вы можете использовать что-то вроде этого (Поддержка в 3.0)

g = group(compute_for_user.s(user.id) for user in users)
g.apply_async()
9 голосов
/ 06 марта 2013

И поскольку версия 3.0 «TaskSet» больше не является термином ... Группы, цепочки и аккорды как особый тип подзадач - это новое, см. http://docs.celeryproject.org/en/3.1/whatsnew-3.0.html#group-chord-chain-are-now-subtasks

0 голосов
/ 16 ноября 2018

Для упомянутой ошибки IOError, хотя приведенной здесь информации недостаточно для определения причины ее возникновения, я предполагаю, что вы пытались установить соединение внутри функции задачи, поэтому всякий раз, когда вызывается задача, создается новое соединение. Если задача будет вызвана тысячу раз, будет тысяча соединений. Это затопит системный менеджер сокетов, и IOError является его жалобой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...