Я бы создал для этого один DAG и просто связал бы тесты вместе. Если тест не пройден, отметьте задачу как неудачную; стандартная конфигурация предназначена для того, чтобы зависимые задачи не запускались.
Когда вы генерируете DAG динамически, вам нужно быть уверенным, что все ваши задачи, однажды добавленные в DAG, останутся навсегда. Если тесты можно удалить, вы, вероятно, захотите создать для них фиктивную задачу. Из Рекомендации документации :
Удаление задачи
Никогда не удаляйте задачу из группы доступности базы данных. В случае удаления историческая информация о задаче исчезает из пользовательского интерфейса Airflow. Рекомендуется создать новую группу DAG на случай, если задачи необходимо удалить.
Это означает, что если тест удален и для теста не создается задача, то вы больше не можете видеть как этот удаленный тест проводился в прошлом. Если вместо этого вы поместили фиктивную задачу на их место, вы все равно сможете увидеть, как тест выполнялся в прошлом до его удаления.
Принимая ваши требования:
- L oop через набор тестов
- Запустить тест.
- Результаты проверки
- Если тесты пройдены, переходите к следующему набору тестов.
- Если тесты терпят неудачу, записать информацию об ошибке (время, причина, какой тест был запущен) в базе данных.
- Отправьте сообщение о сбоях.
Шаг 1. генерируете ли вы DAG из вашего источника PyMon go (заботясь о замене удалил тест с пустышками, так что отслеживайте где-нибудь, какие задачи вы создали)
Шаг 2. запускает тест в задаче.
Шаг 3 - та же задача проверка результатов; если задача не удалась, установите состояние экземпляра задачи как сбой, вызвав исключение AirflowException
. Подойдет почти любое исключение, но AirFlowException("reason")
немного чище.
Шаг 4 - это задача, которая зависит от всех тестовых задач и выполняется безоговорочно (независимо от результатов теста), а также создает и отправляет электронное письмо как последний шаг в группе DAG. Вы хотите установить его trigger_rule
значение на all_done
, чтобы выполнить sh this; он будет запущен после того, как все задачи, от которых он зависит, завершились успешно, не прошли или были пропущены.
Обратите внимание, что задачи будут повторяться, если вы дадите им retries
значение больше 0; для ваших тестов вы, вероятно, захотите установить это значение в 0.
Итак, чтобы сгенерировать DAG, вы должны использовать:
test_tasks = []
last_test = None
for count, test_case in enumerate(test_list):
check_status_task = PythonOperator(
task_id='check_test_status_'+str(count),
python_callable=check_test_status,
op_kwargs={'test_case': test_case},
provide_context=True,
retries=0, # no retrying when it fails
dag=dag,
)
if test_tasks:
# add task to pipeline defined by all tasks running in series
check_status_task << test_tasks[-1]
test_tasks.append(check_status_task)
email_result_task = PythonOperator(
task_id='email_result',
python_callable=email_result,
provide_context=True,
# run when all parent tasks are done, regardless the outcome
trigger_rule='all_done',
dag=dag
)
test_tasks >> email_result_task
Приведенный выше код строит граф зависимостей, беря каждый предыдущий test (test_tasks[-1]
) в качестве восходящей задачи для только что сгенерированной в l oop. Почтовая задача зависит от всего списка тестовых задач.
Обратите внимание, что, поскольку вы устанавливаете provide_context=True
и предоставляете словарь op_kwargs
, вы можете просто предоставить своим операторам прямые аргументы для любого конкретного c контекста или op_kwargs
имен, к которым вам нужен доступ в вашей функции. Тогда check_test_status()
становится:
from airflow import AirflowException
def check_test_status(test_case, **kwargs):
# run the test, get the result
...
if test_failed:
# log failure in database
...
# then mark this task as failed
raise AirflowException("Test <testcase> failed: ...")
Строка raise AirflowException()
приводит к тому, что эта тестовая задача помечается как неудачная, а все другие тестовые задачи в вашей группе DAG не будут выполняться сейчас, потому что они используют правило триггера по умолчанию trigger_rule='all_success'
.
Если вместо этого вы используете return
в своей функции PythonOperator, то возвращаемое значение просто отбрасывается, если вы не используете xcom_push=True
в задаче, после чего он помещается в специальную переменную XCom для этой задачи, airfow.models.XCOM_RETURN_KEY
. Лично я предпочитаю использовать специальные переменные Xcom, особенно когда вам также нужно помечать задачи как неуспешные, вызывая исключения.
Итак, для последней задачи электронной почты вы можете использовать переменную XCom
для передачи информации из любого тестовые задания, которые выполнялись и далее; все, что можно мариновать или сериализовать в JSON (в зависимости от конфигурации Airflow), можно передать таким образом.
XCOM_TEST_STATUS_KEY = "check_test_status"
def check_test_status(test_case, ti, **kwargs):
# run the test, get the result
...
test_status = {
# ... information to share with the mail task
}
ti.xcom_push(XCOM_TEST_STATUS_KEY, test_status)
if test_failed:
# log failure in database
...
# then mark this task as failed
raise AirflowException("Test <testcase> failed: ...")
, а затем в задаче электронной почты:
def email_result(dag, ti, **kwargs):
test_tasks = [id for id in dag.task_ids if id.starts_with('check_test_status_')]
test_results = ti.xcom_pull(
task_ids=test_tasks,
key=XCOM_TEST_STATUS_KEY,
)
# process test_results to produce an email