Задача сельдерея и настроить декоратор - PullRequest
13 голосов
/ 18 июня 2011

Я работаю над проектом с использованием django и сельдерея (django-celery). Наша команда решила обернуть весь код доступа к данным в (app-name)/manager.py (НЕ вписывать в менеджеры, как django), и позволить коду в (app-name) /task.py только заниматься сборкой и выполнять задачи с сельдереем (так у нас нет зависимости django ORM в этом слое).

В моем manager.py у меня есть что-то вроде этого:

def get_tag(tag_name):
    ctype = ContentType.objects.get_for_model(Photo)
    try:
        tag = Tag.objects.get(name=tag_name)
    except ObjectDoesNotExist:
        return Tag.objects.none()
    return tag

def get_tagged_photos(tag):
    ctype = ContentType.objects.get_for_model(Photo)
    return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):
    return get_tagged_photos(tag).count()

В моем файле task.py мне нравится заключать их в задачи (затем, возможно, использовать эти задачи для выполнения более сложных задач), поэтому я пишу этот декоратор:

import manager #the module within same app containing data access functions

class mfunc_to_task(object):
    def __init__(mfunc_type='get'):
        self.mfunc_type = mfunc_type

    def __call__(self, f):
        def wrapper_f(*args, **kwargs):
            callback = kwargs.pop('callback', None)

            mfunc = getattr(manager, f.__name__)

            result = mfunc(*args, **kwargs)
            if callback:
                if self.mfunc_type == 'get':
                    subtask(callback).delay(result)
                elif self.mfunc_type == 'get_or_create':
                    subtask(callback).delay(result[0])
                else:
                    subtask(callback).delay()
            return result            

        return wrapper_f

затем (все еще в task.py):

#@task
@mfunc_to_task()
def get_tag():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos_count():
    pass

Все отлично работает без @task. Но после применения этого @task декоратора (к началу, как указано в документации по сельдерею), вещи просто начинают разваливаться. Очевидно, что каждый раз, когда вызывается mfunc_to_task.__call__, та же самая функция task.get_tag передается как f. Поэтому я получал один и тот же wrapper_f каждый раз, и теперь единственное, что я могу сделать, это получить один тег.

Я новичок в декораторах. Кто-нибудь может помочь мне понять, что здесь пошло не так, или указать другие способы достижения этой цели? Мне очень не хочется писать один и тот же код переноса задачи для каждой из моих функций доступа к данным.

Ответы [ 2 ]

18 голосов
/ 05 марта 2013

Не совсем уверен, почему передача аргументов не работает?

, если вы используете этот пример:

@task()
def add(x, y):
    return x + y

позволяет добавить некоторые записи в MyCoolTask:

from celery import task
from celery.registry import tasks

import logging
import celery

logger = logging.getLogger(__name__)

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        logger.info("Starting to run")
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        logger.info("Ending run")
        pass

и создайте расширенный класс (расширяющий MyCoolTask, но теперь с аргументами):

class AddTask(MyCoolTask):

    def run(self,x,y):
        if x and y:
            result=add(x,y)
            logger.info('result = %d' % result)
            return result
        else:
            logger.error('No x or y in arguments')

tasks.register(AddTask)

и убедитесь, что вы передаете kwargs как данные json:

{"x":8,"y":9}

Я получаю результат:

[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17
8 голосов
/ 18 июня 2011

Вместо использования декоратора, почему вы не создаете базовый класс, расширяющий celery.Task?

Таким образом, все ваши задачи могут расширять ваш настроенный класс задач, где вы можете реализовать свое личное поведение, используя методы __call__ и after_return , Вы также можете определить общие методы и объекты для всех ваших задач.

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        pass
...