Как убедиться, что задание Celery было поставлено в очередь в Pytest? - PullRequest
1 голос
/ 01 июля 2019

У меня есть конечная точка API для регистрации нового пользователя. «Приветственное письмо» будет поставлено в очередь и выполнит эту задачу асинхронно. У меня есть 2 модульных теста для проверки:

  1. Api сохраняет информацию о пользователе в БД. OK
  2. Задача Celery отправляет электронное письмо с правильным содержанием + шаблон

Я хочу добавить 3-й модульный тест , чтобы обеспечить"Конечная точка должна поставить в очередь отправку электронной почты после сохранения формы пользователя в БД"

Я пытаюсь с сельдереем. AsyncResult, но он просит меня запустить рабочий. Более того, даже если работник готов, мы все еще не можем проверить, была ли поставлена ​​задача в очередь или нет, потому что неоднозначное состояние PENDING:

  1. Задача существует в очереди, но еще не выполнена: PENDING
  2. Задача не существует в очереди: В ОЖИДАНИИ

Кто-нибудь сталкивается с этой проблемой? Как мне это решить?

1 Ответ

0 голосов
/ 01 июля 2019

Распространенным способом решения этой проблемы в средах тестирования является использование параметра конфигурации task_always_eager , который в основном указывает Celery запускать задачу как обычную функцию.Вместо AsyncResult Celery создаст объект типа EagerResult, который ведет себя одинаково, но имеет совершенно другую логику выполнения.

...