Python Celery: обновление модели django после изменения состояния - PullRequest
0 голосов
/ 13 октября 2018

Мне удалось найти 2 аналогичные темы с обсуждением этой проблемы, но, к сожалению, я не смог найти лучшее решение от нее:

  1. Обновление поля модели Django на основе сельдереяСтатус задачи
  2. Обновление поля модели Django на основе статуса задачи сельдерея

Я использую Django & Celery (+ redis в качестве брокера сообщений), и я бынравится обновлять модель django при изменении статуса задачи сельдерея (с ожидающих -> успешных, ожидающих -> сбоев) и т. д.

Мой код:

import time

from celery import shared_task

@shared_task(name="run_simulation")
def run_simulation(simulation_id: str):
    t1_start = time.perf_counter()
    doSomeWork() # we may change this to sleep for instance
    t1_end = time.perf_counter()
    return{'process_time': t1_end - t1_start}

и особый вид, из которого явызов задачи:

def run_simulation(request):
    form = SimulationForm(request.POST)
    if form.is_valid():
        new_simulation = form.save()
        new_simulation.save()
        task_id = tasks.run_simulation.delay(new_simulation.id)

Вопрос в том, какой предпочтительный способ обновить состояние модели django Simulation при изменении состояния задачи?

В документах я нашел обработчики, которыеиспользуют методы on_failure, on_success и т. д. http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers

1 Ответ

0 голосов
/ 14 октября 2018

Я не думаю, что есть предпочтительный способ сделать что-то подобное, так как это зависит от вашего проекта.Вы можете использовать задачу мониторинга, например, ссылку, которую вы отправили.Присвойте задаче идентификатор задачи и перепланируйте задачу, пока отслеживаемая задача не перейдет в состояние ЗАВЕРШЕНО.

from celery import AsyncResult

@app.task(bind=True)
def monitor_task(self, t_id):
    """Monitor a task"""
    res = AsyncResult(t_id, backend=self.backend, app=self.app)
    if res.ready():
        raise self.retry(
            countdown=10,
            exc=Exception("Main task not done yet.")
        )

Вы также можете создать приемник событий и проверить состояние задачи, а затем сохранить ее наDB.http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing

Теперь, если вас интересуют только состояния успеха и сбоя, вы можете создать обратные вызовы успеха и сбоя и сохранить в нем состояние успеха или сбоя.

tasks.run_simulation.apply_async(
    (sim_id,),
    link=tasks.success_handler.s(),
    link_error=tasks.error_handler()
)

http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks

...