Celery beat не выполняет периодические c задачи, добавленные во время выполнения - PullRequest
0 голосов
/ 08 мая 2020

Я ищу способ для пользователя добавить задачу periodi c в обычную задачу сельдерея. Например, я хочу создать отчет сейчас и запланировать тот же отчет, например, на 16:00 каждый день.

Часть, которая работает: Внутри @celery.on_after_finalize.connect У меня есть приложение, которое просматривает базу данных для PendingReports, а затем вызываю add_periodic_task для каждой строки. Задача periodi c будет выполнена в 16:00.

Часть, которая не работает: когда я вызываю add_periodic_task внутри обычной функции @celery.task. Пока приложение работает, пользователь может создавать отчет сейчас, а также каждый день в 16:00. Эта задача не будет добавлена, но она будет обработана, если приложение будет закрыто и перезапущено - после вызова @celery.on_after_finalize.connect. Так что кажется, что add_periodic_task внутри @celery.task просто ничего не делает.

Вот как я добавляю задачу periodi c внутрь обычной задачи:

@celery.task(bind=True)
def process_sched_event_async(self, _id, add_periodic=True):
    job_date = ...
    ...
    if add_periodic:
        print(celery.conf.beat_schedule)
        celery.add_periodic_task(
            crontab(hour=job_date.hour, minute=job_date.minute),
            process_sched_event_async.s(event['id'], add_periodic=False),
        )
        print(celery.conf.beat_schedule)

Одна вещь Следует отметить, что независимо от того, что запланировано, первый оператор печати всегда {}, а второй всегда имеет одну запись для только что добавленного.

Вот мой on_finalize_connect:

@celery.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    scheduled_events = ...

    for event in scheduled_events:
        job_date = ...
        sender.add_periodic_task(
            crontab(hour=job_date.hour, minute=job_date.minute),
            process_sched_event_async.s(event['id']),
        )

Так почему же задача periodi c не записывается, когда она вызывается в обычной задаче сельдерея? Как я могу динамически добавлять задачу periodi c во время выполнения? Также вот мой звонок сельдерея и сельдерея:

celery worker -A celery_worker.celery -E --loglevel=debug

celery beat -A celery_worker.celery --loglevel=debug

gunicorn -c ./gunicorn.conf.py wsgi:app
...