В настоящее время у меня есть партия сельдерея, работающая с django примерно так:
Celery.py:
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")
@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)
....
@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)
Моя цель - когда файл сельдерея редактируется, он перезапускает все задача - удалить задачу из очереди, которая еще не выполняется (я делаю это, потому что recursion_function
запустится снова, если завершит свою работу по проверке каждой записи таблицы в моей базе данных, поэтому я не беспокоюсь, что она остановится на полпути ).
Функция check_loop
вызовет API, который имеет подкачку для возврата списка объектов, и я сравню его с записью в таблице, если совпадение затем создаст новую пользовательскую запись другой модели
Мой вопрос: когда я удаляю все сообщения, текущее задание будет остановлено на полпути или оно продолжит выполняться? потому что если функция check_loop
остановит на полпути цикл по списку API, то он снова запустит l oop и создаст новую дублирующую запись, которую я не хочу
ПРИМЕР:
во время выполнения задачи check_loop()
он создал объект на полпути (в списке API от элемента id = 2 до id = 5), перезапуск сервера -> запустить снова, теперь check_loop()
запустить с начала (на API список от элемента id = 2 до id = 5) и снова создал объект из этого списка (какие 100% я не хочу)
Это так? мне просто нужно подтверждение
РЕДАКТИРОВАТЬ:
https://docs.celeryproject.org/en/4.4.1/faq.html#how -do-i-purge-all-Ожидающие задачи
Я добавил app.control.purge()
, потому что когда я перезагружаюсь, recursion_function
снова вызывают в setup_periodic_tasks
, в то время как предыдущий recursion_function
из recursion_function.apply_async(countdown=30)
тоже выполняется, поэтому он умножается