Я пишу тесты для обработчика сигнала сельдерея task_failure
, однако у меня есть проблема, чтобы вызвать этот сигнал во время тестов.
Как я могу вызвать сигнал сельдерея вручную?
У меня есть следующий код:
tasks.py
def do_something():
print "hello"
@app.task(name='simple_task')
def simple_task(x, y):
do_something()
return x - y
@task_failure.connect(sender='simple_task')
def task_failure_handler(args=None, **kwargs):
logger.exception("Task failed")
@celeryd_init.connect
def init_signals(*args, **kwargs):
task_failure.connect(
task_failure_handler
sender='simple_task'
)
и я пытаюсь написать тесты, чтобы проверить, вызывается ли сигнал при сбое задачи.
Пока я сделал:
@patch('tasks.task_failure_handler')
@patch('tasks.do_something')
def test_signal_handler(task_mock, signal_mock):
task_mock.side_effect = Exception()
tasks.simple_task.apply((1, 2,))
assert signal_mock.called
Но сигнал не срабатывает и утверждение не выполняется.
Я сделал некоторую отладку, и проблемы, похоже, связаны с self._live_receivers(sender)
в celery/utils/dispatch/signal.py
self._live_receivers(sender)
пусто, несмотря на то, что получатель доступен, но ни одно из этих условий: r_senderkey == NONE_ID or r_senderkey == senderkey
не соответствует действительности. Я не понимаю, почему это происходит.
Почему мой ресивер не живой? Как я могу запустить сигнал вручную?