Как проверить очередь обработки задачи сельдерея - PullRequest
2 голосов
/ 28 мая 2020

Сейчас я использую сельдерей для периодических c задач. Я новичок в сельдерее. У меня двое рабочих работают в двух разных очередях. Один для медленных фоновых заданий и один для очереди пользователя заданий в приложении.

Я отслеживаю свои задачи в datadog, потому что это простой способ подтвердить, что мои рабочие работают должным образом.

Что я хочу сделать, так это после завершения каждой задачи записывать, в какой очереди задача была завершена.

@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
    statsd.increment("celery.on_task_publish.start.increment")

    task = celery.tasks.get(sender)
    queue_name = task.queue

    statsd.increment("celery.on_task_publish.increment", tags=[f"{queue_name}:{task}"])

Следующая функция - это то, что я реализовал после исследования документации по сельдерею и некоторых других. StackOverflow публикует сообщения, но он не работает должным образом. Я получаю первое приращение statsd, но оставшийся код не выполняется.

Мне интересно, есть ли более простой способ проверить внутри / после завершения каждой задачи, какая очередь обрабатывала задачу.

1 Ответ

0 голосов
/ 28 мая 2020

Поскольку в вашем вопросе указано , есть ли способ проверить внутри / после завершения каждой задачи - я предполагаю, что вы не пробовали этот бэкэнд с сельдереем-результатом. Итак, вы можете проверить эту функцию, которая предоставляется самим Celery: Celery-Result-Backend / Task-result-Backend. Это очень полезно для хранения результатов ваших задач с сельдереем. Прочтите это => https://docs.celeryproject.org/en/stable/userguide/configuration.html#task -result-backend-settings


Как только вы получите представление о том, как настроить этот результат-backend, найдите ключ result_extended ( в той же ссылке), чтобы иметь возможность добавлять queue-names в возвращаемые значения вашей задачи.

Количество доступных опций - например, вы можете настроить эти результаты на go на любой из них:

Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc 

Я использовал эту Result-Backend функцию с Elasticsearch, и вот как хранятся результаты моей задачи:

enter image description here

Это просто вопрос добавления нескольких конфигураций в файл settings.py в соответствии с вашими требованиями. Очень хорошо сработал для моего приложения. И у меня есть еженедельный cron, который очищает только successful results задач - поскольку нам больше не нужны результаты - а я вижу только failed results (как на изображении).

Это были основные ключи для моего требования: task_track_started и task_acks_late вместе с result_backend

...