Как издеваться над python_callable и on_failure_callback PythonOperator? - PullRequest
2 голосов
/ 07 марта 2019

Что и где я высмеиваю, с потоком воздуха PythonOperator, таким что:

  • python_callback вызывает исключение, вызывая вызов on_failure_callback и
  • Я могу проверить, вызывается ли этот обратный вызов и с какими аргументами?

Я пытался издеваться над {python_callable} и PythonOperator.execute в нескольких местах, но безуспешно.

Файлы кода выглядят примерно так:

пантов / my_code.py

class CustomException(Exception): pass

def a_callable():
    if OurSqlAlchemyTable.count() == 0:
        raise CustomException("{} is empty".format(OurSqlAlchemyTable.name))
    return True

def a_failure_callable(context):
    SlackWebhookHook(
        http_conn_id=slack_conn_id,
        message= context['exception'].msg,
        channel='#alert-channel'
        ).execute()

пантов / a_dag.py

from my_code import a_callable, a_failure_callable

new_task = PythonOperator(
    task_id='new-task', dag=dag-named-sue, conn_id='a_conn_id', timeout=30,
    python_callable=a_callable,
    on_failure_callback=a_failure_callable)

пантов / test_a_dag.py

class TestCallback(unittest.TestCase):

    def test_on_failure_callback(self):
        tested_task = DagBag().get_dag('dag-named-sue').get_task('new-task')

        with patch('airflow.operators.python_operator.PythonOperator.execute') as mock_execute:
            with patch('dags.a_dag.a_failure_callable') as mock_callback:
                mock_execute.side_effect = CustomException
                tested_task.execute(context={})

            # does failure of the python_callable trigger the failure callback?
        mock_callback.assert_called()

            # did the exception message make it to the failure callback?
        failure_context = mock_callback.call_args[0]
        self.assertEqual(failure_context['exception'].msg,
                        'OurSqlAlchemyTable is empty')O

Тест поднимает CustomException в строке self.task.execute(context={}) - но в самом коде теста. Я хочу, чтобы эта ошибка была возникает в коде воздушного потока, так что PythonOperator не удается и вызывает on_failure_callback.

Я пробовал любое количество перестановок, все они поднимались в тесте без запуск, вызов python_callable или не поиск объекта для исправления:

patch('dags.a_dag.a_callable') as mock_callable
      'a_dag.a_callable'
      'dags.my_code.a_callable'
      'my_code.a_callable'
      'airflow.models.Task.execute'

(Python3, pytest и mock.)

Что я пропускаю / делаю неправильно?

(Еще лучше, я хотел бы проверить аргументы, переданные SlackWebhookHook. Что-то вроде:

with patch('???.SlackWebhookHook.execute') as mock_webhook:
    ... as above ...

kw_dict = mock_webhook.call_args[-1]
assert kw_dict['http_conn_id'] == slack_conn_id
assert kw_dict['message'] == 'OurSqlAlchemyTable is empty'
assert kw_dict['channel'] == '#alert-channel'

(Но я сначала сосредоточился на тестировании обратного вызова сбоя.)

Заранее спасибо.

...