Захват неудавшихся задач от работников Celery во время выполнения задачи - PullRequest
2 голосов
/ 18 января 2020

В настоящее время я использую метод задержки в сельдерее (с RabbitMQ ) для выполнения асинхронных задач. Задачи распределены по 8 работникам, и в этих задачах я выполняю операцию вставки базы данных.

Иногда некоторые из задач терпят неудачу , и вставка базы данных не происходит из-за тайм-аут (или) Json ошибки декодирования , и я хочу захватить те конкретные задачи , которые не выполняются. Ниже приведены мои фрагменты кода Celery, которые я использую в своем Django Project .

views.py

def celery_url_filtering(request):
    for each_data in dataset:
        #each_data is a Json object
        res = result.delay(each_data)
    while(res.status == 'PENDING'):
         pass
    return JsonResponse({'ok':'Success'})

tasks.py

@app.task
def result(dataeach_data:
    # Parse each_data  and do data insertion here
    return "Something"

Пожалуйста, предложите обходной путь для записи неудачных задач в списке.

1 Ответ

0 голосов
/ 20 января 2020

Из вашего беспокойства я могу понять, что вы хотите изучить задачи «ОЖИДАНИЕ» или «СБОЙ» и повторить / применить некоторые логи приложений c.

Если это так, вы можете запустить cron по фиксированному расписанию, ежедневно / ежечасно и т.д. c, в зависимости от требований. Это задание cron может захватывать задачи, которые не были выполнены в последний день / час, в соответствии с вашим графиком.

Вы можете использовать django -celery-beat для задания задания cron и django -celery-results для сохранения результатов задания сельдерея с использованием Django ORM.

Например, у вас может быть задание на сельдерее, подобное этому

from celery import shared_task
from django_celery_results.models import TaskResult

**tasks.py**
@shared_task(name="failed_task_cron", bind=True)
def failed_task_cron(self, **kwargs):
    """
    Celery task to run on daily schedule to do something with the failed tasks
    """
    tasks = TaskResult.objects.filter(status='FAILURE')
    # tasks is a queryset of all the failed tasks
    # Perform application logic

Вы можете установить cron для вышеуказанного задания, как это, в настройках сельдерея

from celery.schedules import crontab

# ...

CELERY_BEAT_SCHEDULE = {
    "failed_task_cron": {
        "task": "path/to/tasks.failed_task_cron",
        "schedule": crontab(hour=1)           # Runs every hour
    }
}
...