Вы можете написать собственный загрузчик или использовать сигналы.
В загрузчиках есть метод on_task_init
, который вызывается, когда задача должна быть выполнена,
и on_worker_init
, который вызывается основным процессом сельдерея + сельдерея.
Использование сигналов, вероятно, самое простое, доступны следующие сигналы:
0.8.x:
task_prerun(task_id, task, args, kwargs)
Отправляется, когда задача собирается быть выполнена работником (или локально
при использовании apply
/ или если было установлено CELERY_ALWAYS_EAGER
).
task_postrun(task_id, task, args, kwargs, retval)
Отправляется после выполнения задачи в тех же условиях, что и выше.
task_sent(task_id, task, args, kwargs, eta, taskset)
Вызывается при применении задачи (не подходит для длительных операций)
Дополнительные сигналы доступны в 0.9.x (текущая основная ветвь на github):
worker_init()
Вызывается при запуске celeryd (до инициализации задачи, поэтому если
система, поддерживающая fork
, любые изменения памяти будут скопированы на ребенка
рабочие процессы).
worker_ready()
Вызывается, когда celeryd может получать задания.
worker_shutdown()
Вызывается при выключении сельдерея.
Вот пример предварительного вычисления чего-либо при первом запуске задачи в процессе:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Если вы хотите, чтобы функция запускалась для всех задач, просто пропустите аргумент sender
.