Я ищу способ для пользователя добавить задачу periodi c в обычную задачу сельдерея. Например, я хочу создать отчет сейчас и запланировать тот же отчет, например, на 16:00 каждый день.
Часть, которая работает: Внутри @celery.on_after_finalize.connect
У меня есть приложение, которое просматривает базу данных для PendingReports, а затем вызываю add_periodic_task
для каждой строки. Задача periodi c будет выполнена в 16:00.
Часть, которая не работает: когда я вызываю add_periodic_task
внутри обычной функции @celery.task
. Пока приложение работает, пользователь может создавать отчет сейчас, а также каждый день в 16:00. Эта задача не будет добавлена, но она будет обработана, если приложение будет закрыто и перезапущено - после вызова @celery.on_after_finalize.connect
. Так что кажется, что add_periodic_task
внутри @celery.task
просто ничего не делает.
Вот как я добавляю задачу periodi c внутрь обычной задачи:
@celery.task(bind=True)
def process_sched_event_async(self, _id, add_periodic=True):
job_date = ...
...
if add_periodic:
print(celery.conf.beat_schedule)
celery.add_periodic_task(
crontab(hour=job_date.hour, minute=job_date.minute),
process_sched_event_async.s(event['id'], add_periodic=False),
)
print(celery.conf.beat_schedule)
Одна вещь Следует отметить, что независимо от того, что запланировано, первый оператор печати всегда {}
, а второй всегда имеет одну запись для только что добавленного.
Вот мой on_finalize_connect:
@celery.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
scheduled_events = ...
for event in scheduled_events:
job_date = ...
sender.add_periodic_task(
crontab(hour=job_date.hour, minute=job_date.minute),
process_sched_event_async.s(event['id']),
)
Так почему же задача periodi c не записывается, когда она вызывается в обычной задаче сельдерея? Как я могу динамически добавлять задачу periodi c во время выполнения? Также вот мой звонок сельдерея и сельдерея:
celery worker -A celery_worker.celery -E --loglevel=debug
celery beat -A celery_worker.celery --loglevel=debug
gunicorn -c ./gunicorn.conf.py wsgi:app