Как установить зависимость задачи на основе параметров зависимости из файла json в задаче и динамически генерировать эти задачи в воздушном потоке - PullRequest
0 голосов
/ 19 июня 2020

Мне нужно создать такой рабочий процесс, в котором задачи определены в файле json с их зависимой задачей и на основе зависимого условия в параметре задачи, эта задача должна создаваться динамически. например,

Мой Json файл

{
  "workflow_name": "workflow_dag",
  "schedule": "@once",
  "tasks": [
    {
         "id": "task1",
         "name": "vipul",
         "dependson": ""
    },
    {
         "id": "task2",
         "name": "ajay",
         "dependson": "task1"
    },
    {
         "id": "task3",
         "name": "ajay",
         "dependson": "task1"

    },
    {
         "id": "task4",
         "name": "prakash",
         "dependson": "task2"
    },
    {
         "id": "task5",
         "name": "shivendra",
         "dependson": "task3"
    }

  ]
} 

что-то, что у меня есть, но он генерирует задачи последовательно из файла json.

def create_dag(dag_id,
    schedule,
    default_args,
    conf,
    ):

    dag = DAG(dag_id,default_args=default_args,
              schedule_interval=schedule)
    list = []
    with dag:

        for flow in conf['flows']:
            for (key, value) in flow.items():
                if key=='id':
                    tab = DummyOperator(
                                task_id=value, 
                                dag=dag,
                                )
                    list.append(tab)
                    break

        if len(list) > 1:
            for i in range(0, len(list)):
                if i + 1 <= len(list)-1:
                    list[i] >> list[i + 1]

        return dag


with open('/home/vipul/airflow-workspace/airflow_home/dags/wk_config.json'
          ) as json_data:
    conf = json.load(json_data)
    schedule = conf['schedule']
    dag_id = conf['workflow_name']

args = {
    'owner': 'vipul',
    'depends_on_past': False,
    'start_date': datetime(2020, 6, 19),
    'email': ['vipul@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'concurrency': 1,
    'max_active_runs': 1,
    }

globals()[dag_id] = create_dag(dag_id, schedule, args, conf)

мой вывод должно быть как

 task1 ->task2 -> task4
       ->task3 -> task5

Может кто-нибудь помочь с этим.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...