Как я могу проверить on_failure в сельдерее - PullRequest
0 голосов
/ 28 мая 2018

У моей задачи сельдерея есть базовый класс, в котором реализован метод on_failure.

В моем тесте я исправил один из методов, к которому вызывается задача, чтобы вызвать исключение, но on_faliureникогда не вызывается.

Базовый класс

class BaseTask(celery.Task):
    abstract = True 

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("error")

Задание

@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
    logic.method(a, b)

Тест

@patch('tasks.logic.method')
def test_something(self, mock):
    # arrange
    mock.side_effect = NotImplementedError

    # act
    with self.assertRaises(NotImplementedError):
        multiple(1, 2)

При запуске сельдерея и исключении поднимается всеработает отлично.CELERY_ALWAYS_EAGER активирован.

как я могу заставить on_faliure бежать?

1 Ответ

0 голосов
/ 24 октября 2018

Из обсуждения по проблеме в сельдерее GitHub : on_failure test is "уже выполнено на уровне Celery (проверка, вызывается ли on_failure)" and "написать тест, чтобы проверить, что вместо этого делает ваш on_failure ".Вы можете определить функцию внутри метода on_failure и протестировать ее, или вызвать on_failure как метод класса:

import TestCase
from billiard.einfo import ExceptionInfo

class TestTask(TestCase):
    def test_on_failure(self):
        "Testing on failure method"

        exc = Exception("Test")
        task_id = "test_task_id"
        args = ["argument 1"]
        kwargs = {"key": "value"}
        einfo = ExceptionInfo

        # call on_failure method
        multiple.on_failure(exc, task_id, args, kwargs, einfo)

        # assert something appened

ExceptionInfo - это тот же тип объекта, который используется celery;multiple это ваша задача, как вы определили ее в своем вопросе.

Надеюсь, это поможет

...