Собственные параметры для метода PeriodicTask run () в Celery - PullRequest
2 голосов
/ 17 мая 2010

Я пишу небольшое приложение на Django, и я должен быть в состоянии создать для каждого модельного объекта свое периодическое задание, которое будет выполнено с определенный интервал. Я использую для этого приложение Celery, но я не могу понять одну вещь:

class ProcessQueryTask(PeriodicTask):
   run_every = timedelta(minutes=1)

   def run(self, query_task_pk, **kwargs):
       logging.info('Process celery task for QueryTask %d' %
query_task_pk)
       task = QueryTask.objects.get(pk=query_task_pk)
       task.exec_task()
       return True

Тогда я делаю следующее:

>>> from tasks.tasks import ProcessQueryTask
>>> result1 = ProcessQueryTask.delay(query_task_pk=1)
>>> result2 = ProcessQueryTask.delay(query_task_pk=2)

Первый вызов успешен, но другие периодические вызовы возвращают ошибку - TypeError: run () принимает ровно 2 аргумента без ключевых слов (1 дан) в сервер сельдерея. Могу ли я передать собственные параметры в PeriodicTask run()?

1 Ответ

5 голосов
/ 08 декабря 2010

На этот вопрос прекрасно ответил Ask Solem в свой ответ на ваш вопрос в группе пользователей сельдерея Google .

Периодические задачи не используют аргументы, поэтому вам нужно сделать несколько классы или сделать периодическое задание, которое обрабатывает более одной "модели".

например:.

from celery.task import PeriodicTask
from celery.decorators import periodic_task

# base class
class BaseProcessQueryTask(PeriodicTask):
    abstract = True
    run_every = timedelta(minutes=1)
    query_task_pk  = None

    def run(self):
        task = QueryTask.objects.get(pk=self.query_task_pk)
        task.exec_task()

class ProcessQueryTask1(BaseProcessQueryTask):
    query_task_pk = 1

class ProcessQueryTask2(BaseProcessQueryTask):
    query_task_pk = 2

но, скорее всего, вы захотите что-то вроде этого:

@task(ignore_result=True)
def execute_query_task(task):
    task.exec_task()

@periodic_task(run_every=timedelta(minutes=1))
def process_query_tasks():
    for task in QueryTask.objects.all():
        ExecuteQueryTask.delay(task)
...