У меня есть Celery shared_task в модуле tasks
, который выглядит следующим образом:
@shared_task
def task():
from core.something import send_it
send_it()
и я пишу тест, пытающийся patch
send_it
метод. Пока что у меня есть:
from ..tasks import task
class TestSend(TestCase):
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
@patch("core.tasks.send_it")
def test_task(self, send_it_mock):
task()
send_it_mock.assert_called_once()
Когда я запускаю это, я получаю ошибку: AttributeError: <module 'core.tasks' from 'app/core/tasks.py'> does not have the attribute 'send_it'
Из отчаяния я использовал @patch("tasks.task.send_it")
вместо этого, поскольку импорт происходит внутри shared_task, но я получаю аналогичный результат. Кто-нибудь знает, как я могу эффективно пропатчить вызов send_it
? Спасибо!