создать динамический цикл с помощью словаря - PullRequest
0 голосов
/ 20 июня 2019

Я пытаюсь добавить dag для воздушного потока, динамически циклически перебирая ключи словаря и назначая ключи в качестве имени dag.

dags работают хорошо, но я получаю: «Этот DAG недоступен в объекте DagBag веб-сервера. Он отображается в этом списке, потому что планировщик пометил его как активный в базе данных метаданных "и он не активен.

def create_dag(dag_id):
    args = build_default_args(config_file)
    dag = DAG(dag_id,schedule_interval='30 11 * * *', default_args=args)
    with dag:
        init_task = BashOperator(
                    task_id='test_init_task',
                    bash_command='echo "task"',
                    dag=dag
                )
        init_task
    return dag

def get_data(**kwargs):
    my_list=[]
    file = open("/home/airflow/gcs/data/test.json")
    data=json.load(file)
    return data

data1 = data()

for dict in data1:
    for pair in dict.items():
        key , value = pair
        print "key",ls_table ,"value",metrics
        dag_id = '{}'.format(key)
        default_args = {'owner': 'airflow',
                    'start_date': datetime(2019, 6, 18)
                    }
        schedule = '@daily'
        globals()[dag_id] = create_dag(dag_id)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...