Как настроить Celery для вызова пользовательской функции инициализации перед выполнением моих задач? - PullRequest
18 голосов
/ 25 января 2010

У меня есть проект Django, и я пытаюсь использовать Celery для отправки задач для фоновой обработки (http://ask.github.com/celery/introduction.html). Celery хорошо интегрируется с Django, и я смог отправить свои собственные задания и получить результаты.

Единственная проблема в том, что я не могу найти разумный способ выполнения пользовательской инициализации в процессе демона. Мне нужно вызвать дорогую функцию, которая загружает много памяти, прежде чем начать обработку задач, и я не могу позволить себе вызывать эту функцию каждый раз.

Кто-нибудь имел эту проблему раньше? Любые идеи, как обойти это без изменения исходного кода Celery?

Спасибо

1 Ответ

16 голосов
/ 27 января 2010

Вы можете написать собственный загрузчик или использовать сигналы.

В загрузчиках есть метод on_task_init, который вызывается, когда задача должна быть выполнена, и on_worker_init, который вызывается основным процессом сельдерея + сельдерея.

Использование сигналов, вероятно, самое простое, доступны следующие сигналы:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Отправляется, когда задача собирается быть выполнена работником (или локально при использовании apply / или если было установлено CELERY_ALWAYS_EAGER).

  • task_postrun(task_id, task, args, kwargs, retval) Отправляется после выполнения задачи в тех же условиях, что и выше.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Вызывается при применении задачи (не подходит для длительных операций)

Дополнительные сигналы доступны в 0.9.x (текущая основная ветвь на github):

  • worker_init()

    Вызывается при запуске celeryd (до инициализации задачи, поэтому если система, поддерживающая fork, любые изменения памяти будут скопированы на ребенка рабочие процессы).

  • worker_ready()

    Вызывается, когда celeryd может получать задания.

  • worker_shutdown()

    Вызывается при выключении сельдерея.

Вот пример предварительного вычисления чего-либо при первом запуске задачи в процессе:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

Если вы хотите, чтобы функция запускалась для всех задач, просто пропустите аргумент sender.

...