Скажи, что я бегу:
t = BashOperator(
task_id='import',
bash_command="""python3 script.py '{{ next_execution_date }}' """,
dag=dag)
И по какой-то причине я хочу, чтобы скрипт завершил работу с ошибкой и указал поток воздуха, что он должен повторить эту задачу.
Я пытался использовать os._exit(1)
, но Airflow пометил задачу как успешную.
Я знаю, что есть:
from airflow.exceptions import AirflowException
raise AirflowException("error msg")
Но это больше для функций, написанных в DAG. Мой сценарий независим, и иногда мы запускаем его один, независимо от потока воздуха.
Также скрипт Python3
, когда Airflow работает под Python 2.7
Кажется чрезмерным устанавливать Airflow на Python3
только для обработки ошибок.
Это другое решение?