Правило триггера задачи обработки ошибок воздушного потока для запуска сбоя задачи, связанной напрямую - PullRequest
1 голос
/ 08 марта 2019

У меня есть конвейер задачи воздушного потока, как на диаграмме.task1_error_handler & task2_error_handler - это задачи обработки ошибок, которые следует запускать только в случае сбоя непосредственно связанной задачи.Я установил правило запуска ONE_FAILED для этих задач.Но кажется ошибка на task1 запускает оба обработчика ошибок.Мне нужно только вызвать task1_error_handler.Все задачи являются пользовательскими операторами, а идентификаторы задач, заканчивающиеся на status, являются пользовательскими датчиками.Как мне этого добиться? enter image description here

Ответы [ 2 ]

1 голос
/ 08 марта 2019

Ошибка на task1 вызывает оба обработчика ошибок, поскольку task2 находится ниже task1, что делает task1 родителем задачи task2.

Если ваше правило триггера равно ONE_FAILED для task1 и task2, это вызывает проблемы, поскольку определение ONE_FAILED:

срабатывает, как толькотак как хотя бы один из родителей потерпел неудачу, он не ждет, пока все родители будут выполнены

Таким образом, с учетом сказанного, вы хотите, чтобы task1_error_handler срабатывал только в случае сбоя task1.К сожалению, это невозможно сделать, просто изменив правило триггера, поскольку вы не можете напрямую связать условную задачу, как вы хотите в данный момент.

Ваши лучшие ставки будут такими:

  • Оставьте задачу 1 такой, как есть, и избавьтесь от правила триггера task2 для обработчика ошибок и вместо этого используйте on_failure_callback для вызова ошибки.обработчик.
  • Разделить задачу 2 на отдельный DAG.
0 голосов
/ 11 марта 2019

Примечание. Предложенное решение может быть неправильным, но вы все равно поймете, что я пытаюсь добиться того, что Логические вентили делают в цифровых цепях , что может помочь вам найти работающеерешение.Я призываю вас оставить отзыв


@ ответ Зака ​​ очень точно указывает на проблему.Я просто хотел бы добавить обходной путь, который я имею в виду.

Если мы создадим несколько новых фиктивных задач и зависимостей следующим образом, это может просто помочь.

  1. A DummyOperator с trigger_rule=ONE_FAILED вместо task2_error_handler.Его успех означает, что task2 потерпел неудачу (что вполне может быть из-за отказа task1)

    from airflow.operators.dummy_operator import DummyOperator
    from airflow.utils.trigger_rule import TriggerRule
    ..
    task2_dummy_error_handler = DummyOperator(
                                  dag=my_dag,
                                  task_id="task2_dummy_error_handler",
                                  trigger_rule=TriggerRule.ONE_FAILED
                                )
    [task2, task2_status_check] >> task2_dummy_error_handler
    
  2. Другой DummyOperator с trigger_rule=ALL_SUCCESS, который сообщает, был ли task1 успешным или нет .Его сбой будет означать, что task1 не удалось => task2 будет автоматически отказывать из-за UPSTREAM_FAILED, следовательно, нам не нужно запускать task2_retry_handler

    task1_error_handler_status_check = DummyOperator(
                                         dag=my_dag,
                                         task_id="task1_error_handler_status_check",
                                         trigger_rule=TriggerRule.ALL_SUCCESS
                                       )
    [task1, task1_status_check] >> task1_error_handler_status_check
    
  3. Наконец, установите trigger_rule=ALL_SUCCESS в вашем task2_retry_handler и сделайте его ниже по потоку из двух вышеупомянутых фиктивных задач.Это должно гарантировать, что task2_retry_handler работает при сбое task2, но не при сбое task1.

    task2_retry_handler = PythonOperator(
                            dag=my_dag,
                            task_id="task2_retry_handler",
                            python_callable=my_task2_retry_handler,
                            ..,
                            trigger_rule=TriggerRule.ALL_SUCCESS
                          )
    [task1_error_handler_status_check, task2_dummy_error_handler] >> task2_retry_handler
    

Ссылки

...