У моей задачи сельдерея есть базовый класс, в котором реализован метод on_failure
.
В моем тесте я исправил один из методов, к которому вызывается задача, чтобы вызвать исключение, но on_faliure
никогда не вызывается.
Базовый класс
class BaseTask(celery.Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("error")
Задание
@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
logic.method(a, b)
Тест
@patch('tasks.logic.method')
def test_something(self, mock):
# arrange
mock.side_effect = NotImplementedError
# act
with self.assertRaises(NotImplementedError):
multiple(1, 2)
При запуске сельдерея и исключении поднимается всеработает отлично.CELERY_ALWAYS_EAGER
активирован.
как я могу заставить on_faliure
бежать?