Я пытаюсь добавить dag для воздушного потока, динамически циклически перебирая ключи словаря и назначая ключи в качестве имени dag.
dags работают хорошо, но я получаю: «Этот DAG недоступен в объекте DagBag веб-сервера. Он отображается в этом списке, потому что планировщик пометил его как активный в базе данных метаданных "и он не активен.
def create_dag(dag_id):
args = build_default_args(config_file)
dag = DAG(dag_id,schedule_interval='30 11 * * *', default_args=args)
with dag:
init_task = BashOperator(
task_id='test_init_task',
bash_command='echo "task"',
dag=dag
)
init_task
return dag
def get_data(**kwargs):
my_list=[]
file = open("/home/airflow/gcs/data/test.json")
data=json.load(file)
return data
data1 = data()
for dict in data1:
for pair in dict.items():
key , value = pair
print "key",ls_table ,"value",metrics
dag_id = '{}'.format(key)
default_args = {'owner': 'airflow',
'start_date': datetime(2019, 6, 18)
}
schedule = '@daily'
globals()[dag_id] = create_dag(dag_id)