Как вызвать сигналы сельдерея на тестах - PullRequest
1 голос
/ 09 июля 2019

Я пишу тесты для обработчика сигнала сельдерея task_failure, однако у меня есть проблема, чтобы вызвать этот сигнал во время тестов. Как я могу вызвать сигнал сельдерея вручную?

У меня есть следующий код:

tasks.py

def do_something():
    print "hello"

@app.task(name='simple_task')
def simple_task(x, y):
    do_something()
    return x - y

@task_failure.connect(sender='simple_task')
def task_failure_handler(args=None, **kwargs):
    logger.exception("Task failed")


@celeryd_init.connect
def init_signals(*args, **kwargs):
    task_failure.connect(
        task_failure_handler
        sender='simple_task'
    )

и я пытаюсь написать тесты, чтобы проверить, вызывается ли сигнал при сбое задачи.

Пока я сделал:

@patch('tasks.task_failure_handler')
@patch('tasks.do_something')
def test_signal_handler(task_mock, signal_mock):
    task_mock.side_effect = Exception()
    tasks.simple_task.apply((1, 2,))
    assert signal_mock.called

Но сигнал не срабатывает и утверждение не выполняется.

Я сделал некоторую отладку, и проблемы, похоже, связаны с self._live_receivers(sender) в celery/utils/dispatch/signal.py self._live_receivers(sender) пусто, несмотря на то, что получатель доступен, но ни одно из этих условий: r_senderkey == NONE_ID or r_senderkey == senderkey не соответствует действительности. Я не понимаю, почему это происходит.

Почему мой ресивер не живой? Как я могу запустить сигнал вручную?

...