Celery, что происходит с запущенными задачами при использовании app.control.purge ()? - PullRequest
1 голос
/ 06 марта 2020

В настоящее время у меня есть партия сельдерея, работающая с django примерно так:

Celery.py:

from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django

load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
   app.control.purge()
   sender.add_periodic_task(30.0, check_loop.s())
   recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
   print("setup_periodic_tasks")

@app.task()
def check_loop():
    .....
    start = database start number
    end = database end number
    callling apis in a list from id=start to id=end
    create objects
    update database(start number = end, end number = end + 3)

    ....


@app.task()
def recursion_function(default_retry_delay=10):
   .....
   do some looping
   ....
   #when finished, call itself again
   recursion_function.apply_async(countdown=30)

Моя цель - когда файл сельдерея редактируется, он перезапускает все задача - удалить задачу из очереди, которая еще не выполняется (я делаю это, потому что recursion_function запустится снова, если завершит свою работу по проверке каждой записи таблицы в моей базе данных, поэтому я не беспокоюсь, что она остановится на полпути ).

Функция check_loop вызовет API, который имеет подкачку для возврата списка объектов, и я сравню его с записью в таблице, если совпадение затем создаст новую пользовательскую запись другой модели

Мой вопрос: когда я удаляю все сообщения, текущее задание будет остановлено на полпути или оно продолжит выполняться? потому что если функция check_loop остановит на полпути цикл по списку API, то он снова запустит l oop и создаст новую дублирующую запись, которую я не хочу

ПРИМЕР:

во время выполнения задачи check_loop() он создал объект на полпути (в списке API от элемента id = 2 до id = 5), перезапуск сервера -> запустить снова, теперь check_loop() запустить с начала (на API список от элемента id = 2 до id = 5) и снова создал объект из этого списка (какие 100% я не хочу)

Это так? мне просто нужно подтверждение

РЕДАКТИРОВАТЬ:

https://docs.celeryproject.org/en/4.4.1/faq.html#how -do-i-purge-all-Ожидающие задачи

Я добавил app.control.purge(), потому что когда я перезагружаюсь, recursion_function снова вызывают в setup_periodic_tasks, в то время как предыдущий recursion_function из recursion_function.apply_async(countdown=30) тоже выполняется, поэтому он умножается

Ответы [ 2 ]

1 голос
/ 12 марта 2020

Я не собираюсь писать эссе, как превосходный пост Олега выше. Ответ прост - все запущенные задачи продолжат выполняться . purge - это все задачи, которые находятся в очереди, ожидая, чтобы их выбрали рабочие из сельдерея.

1 голос
/ 12 марта 2020

Да , рабочий продолжит выполнение текущей задачи , если рабочий также не будет перезапущен.

Также Путь Сельдерея означает всегда ожидать, что задачи будут выполняться в параллельной среде со следующими соображениями:

  • есть много задач, выполняющихся одновременно
  • есть много сельдерей работники, выполняющие задачи
  • одна и та же задача может быть запущена снова
  • несколько экземпляров одной и той же задачи могут выполняться одновременно
  • любая задача может быть прервана в любое время

даже если вы уверены, что в вашей среде есть только один рабочий, запущенный / остановленный вручную , и это не применяется - задачи должны создаваться таким образом, чтобы все это происходило.

Некоторые полезные приемы:

  • использование транзакций базы данных
  • использование блокировки
  • разбиение выполняемых задач на более быстрые
  • если Задача имеет промежуточные значения для быть сохранены или они важны (то есть не воспроизводимы, как некоторые вызовы API), и их обработка на следующем шаге требует времени - рассмотрите возможность разделения на несколько связанных задач

Если вам нужно запустить только одну экземпляр задачи за раз - используйте какую-то блокировку - создайте / обновите запись блокировки в базе данных или в кэше, чтобы другие (те же задачи) могли проверить и узнать это задание выполняется и просто вернитесь или дождитесь завершения предыдущего.

Т.е. recursion_function также может быть Periodic Task. Будучи периодической c задачей, она будет выполняться каждый интервал, даже если предыдущая не удалась по какой-либо причине (и, следовательно, не сможет снова поставить себя в очередь, как в обычной непериодической c задаче). С блокировкой вы можете убедиться, что одновременно работает только один.


check_loop():

Во-первых, рекомендуется сохранить результаты в одной транзакции в базе данных, чтобы убедиться, что все или ничего не сохранены / изменены в базе данных.

Вы также можете сохранить некоторый маркер, который указывает, сколько / статус сохраненных объектов, поэтому будущие задачи могут просто проверить этот маркер, а не каждый object.

Или каким-либо образом выполнить проверку каждого элемента перед его созданием на предмет того, что он уже существует в базе данных.

...