Запуск периодического задания во время, хранящееся в базе данных - PullRequest
0 голосов
/ 11 мая 2018

В настоящее время у меня есть периодические задачи, настроенные с помощью Планировщика заданий в моем экземпляре Azure.Они запускают конечные точки API (Django) в фиксированное время.

Я хочу сделать эти времена динамическими (что не будет работать с этим решением).План состоит в том, чтобы выполнить эти задачи непосредственно из Джанго.Расписание будет храниться в моей базе данных (MySQL) и получено для создания запланированного задания.Когда эти значения изменены, планировщик также должен измениться соответствующим образом.

После просмотра Celery кажется, что при использовании периодических задач crontab расписаний может работать. Используя это, можно ли установить мое запланированное время на основе значений из моей базы данных?

Похоже, мне также понадобится экземпляр Redis. Поскольку я буду использовать Celery только для периодических задач, это все еще правильный подход?

Ответы [ 3 ]

0 голосов
/ 11 мая 2018

Я использую Django, Celery, RabbitMQ и postgreSQL.

Я делаю именно то, что вы хотите.

PIP: celery и flower

Вам нужен файл конфигурации Celery (в папке settings.py):

То, что вы хотите добавить, это beat_schedule:

app.conf.beat_schedule = {
    'task-name': {
        'task': 'myapp.tasks.task_name',
        'schedule': crontab(minute=30, hour=5, day_of_week='mon-fri'),
    },
}

Это добавит запись в вашу базу данных для выполнения task_name (с понедельника по пятницу в 5:30), которую вы можете изменить непосредственно в настройках (перезагрузите celery и celery beat после)

Что мне нравится, так это то, что вы можете легко добавить механизм повтора с безопасностью:

@app.task(bind=True, max_retries=50)
def task_name(self, entry_pk):
    entry = Entry.objects.get(pk=entry_pk)
    try:
        entry.method()
    except ValueError as e:
        raise self.retry(exc=e, countdown=5 * 60, queue="punctual_queue")

Когда мой method() рейз ValueError, я повторно выполню этот метод через 5 минут с максимальным числом 50 попыток.

Хорошая часть - у вас есть доступ к базе данных в админке Django: enter image description here

И вы можете проверить с помощью flower, выполнена ли задача или нет (с отслеживанием):

enter image description here

У меня ежедневно выполняется более 1000 задач, вам нужно создать очереди и работника.

Для этого я использую 10 рабочих (для будущих отвратительных целей):

celery multi start 10 -A MYAPP -Q:1-3 recurring_queue,punctual_queue -Q:4,5 punctual_queue -Q recurring_queue --pidfile="%n.pid"

И демон, запускающий задачу:

celery -A MYAPP beat -S django --detach

Возможно, это излишне для вас, но он может сделать для вас гораздо больше: - отправка электронной почты асинхронно (если она не удалась, вы можете исправить и повторно отправить письмо) - загрузить и постобработать асинхронные для пользователя - Каждое задание, которое требует времени, но вы не хотите ждать (вы можете связать задание, которое нужно завершить, вернуть результат и использовать его в другом задании)

0 голосов
/ 11 мая 2018

Без внешних библиотек, установите ежедневный скрипт cron, который получает текущие задачи из базы данных и использует многопоточность для запуска в это время.

def take_a_background_nap(time_to_send):
    while datetime.datetime.now() < time_to_send:
        time.sleep(60)
    print('finally running')
    return


threadObj = threading.Thread(target=take_a_background_nap, args=[datetime.datetime(2020, 5, 11, 12, 53, 0)],)
threadObj.start()

Вы можете иметь столько потоков, сколько хотите, но обратите вниманиек вопросам параллелизма.

0 голосов
/ 11 мая 2018

Cron, как полагают, запускает задачи, которые происходят периодически, поскольку у него есть простые настройки конфигурации на каждый день, каждый час, каждые 15 минут ...
Добавление заданий cron на самом деле не является хорошим способом настройки динамических задач для запуска на конкретную дату ( или datetime )

Вы можете использовать модуль schedule , как описано в этом ответе.

Существует также другая библиотека apscheduler , но проверьте, хорошо ли работает последняя версия с python3 (, если вы ее используете )

from datetime import date
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

# Define the function that is to be executed
def my_job(text):
    print text

# The job will be executed on November 6th, 2009
exec_date = date(2009, 11, 6)

# Store the job in a variable in case we want to cancel it
job = sched.add_date_job(my_job, exec_date, ['hello'])
...