Что и где я высмеиваю, с потоком воздуха PythonOperator
, таким что:
-
python_callback
вызывает исключение, вызывая вызов on_failure_callback
и
- Я могу проверить, вызывается ли этот обратный вызов и с какими аргументами?
Я пытался издеваться над {python_callable} и PythonOperator.execute
в нескольких местах, но безуспешно.
Файлы кода выглядят примерно так:
пантов / my_code.py
class CustomException(Exception): pass
def a_callable():
if OurSqlAlchemyTable.count() == 0:
raise CustomException("{} is empty".format(OurSqlAlchemyTable.name))
return True
def a_failure_callable(context):
SlackWebhookHook(
http_conn_id=slack_conn_id,
message= context['exception'].msg,
channel='#alert-channel'
).execute()
пантов / a_dag.py
from my_code import a_callable, a_failure_callable
new_task = PythonOperator(
task_id='new-task', dag=dag-named-sue, conn_id='a_conn_id', timeout=30,
python_callable=a_callable,
on_failure_callback=a_failure_callable)
пантов / test_a_dag.py
class TestCallback(unittest.TestCase):
def test_on_failure_callback(self):
tested_task = DagBag().get_dag('dag-named-sue').get_task('new-task')
with patch('airflow.operators.python_operator.PythonOperator.execute') as mock_execute:
with patch('dags.a_dag.a_failure_callable') as mock_callback:
mock_execute.side_effect = CustomException
tested_task.execute(context={})
# does failure of the python_callable trigger the failure callback?
mock_callback.assert_called()
# did the exception message make it to the failure callback?
failure_context = mock_callback.call_args[0]
self.assertEqual(failure_context['exception'].msg,
'OurSqlAlchemyTable is empty')O
Тест поднимает CustomException
в строке self.task.execute(context={})
- но в самом коде теста. Я хочу, чтобы эта ошибка была
возникает в коде воздушного потока, так что PythonOperator
не удается и вызывает on_failure_callback
.
Я пробовал любое количество перестановок, все они поднимались в тесте без
запуск, вызов python_callable или не поиск объекта для исправления:
patch('dags.a_dag.a_callable') as mock_callable
'a_dag.a_callable'
'dags.my_code.a_callable'
'my_code.a_callable'
'airflow.models.Task.execute'
(Python3
, pytest
и mock
.)
Что я пропускаю / делаю неправильно?
(Еще лучше, я хотел бы проверить аргументы, переданные SlackWebhookHook
. Что-то вроде:
with patch('???.SlackWebhookHook.execute') as mock_webhook:
... as above ...
kw_dict = mock_webhook.call_args[-1]
assert kw_dict['http_conn_id'] == slack_conn_id
assert kw_dict['message'] == 'OurSqlAlchemyTable is empty'
assert kw_dict['channel'] == '#alert-channel'
(Но я сначала сосредоточился на тестировании обратного вызова сбоя.)
Заранее спасибо.