Я создаю скрипт на python для генерации групп DAG (сгенерировать новый файл python со спецификациями DAG) из шаблонов.Все работает нормально, за исключением того, что мне нужно, чтобы DAG генерировался как без паузы.
Я искал и пытался запускать команды оболочки в сценарии следующим образом:
bash_command1 = 'airflow list_dags'
bash_command2 = 'airflow trigger_dag ' + str(DAG_ID)
bash_command3 = 'airflow list_tasks ' + str(DAG_ID)
bash_command4 = 'airflow unpause '+ str(DAG_ID)
subprocess.call(bash_command1.split())
subprocess.call(bash_command2.split())
subprocess.call(bash_command3.split())
subprocess.call(bash_command4.split())
Но каждый раз, когда я создаю новую группу обеспечения доступности баз данных, она отображается как приостановленная в веб-интерфейсе пользователя.В результате проведенного исследования команда airflow unpause <dag_id>
должна решить проблему, но когда скрипт выполняет ее, я получаю сообщение об ошибке:
Traceback (most recent call last):
File "/home/cubo/anaconda2/bin/airflow", line 28, in <module>
args.func(args)
File "/home/cubo/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 303, in unpause
set_is_paused(False, args, dag)
File "/home/cubo/anaconda2/lib/python2.7/site- packages/airflow/bin/cli.py", line 312, in set_is_paused
dm.is_paused = is_paused
AttributeError: 'NoneType' object has no attribute 'is_paused'
Но когда я выполняю ту же команду airflow unpause <dag_id>
втерминал, он работает нормально, и он печатает:
Dag: <DAG: DAG_ID>, paused: False
Любая помощь будет принята с благодарностью.