Как создать DAG из задачи в Airflow - PullRequest
0 голосов
/ 27 апреля 2018

У меня есть требование, когда есть родительский Dag только с одной задачей, которая создает определенные параметры (нефиксированные). Давайте назовем их params1, params2 и params3. Теперь я хочу создать три DAG из задачи в родительском Dag, у которых будут параметры, доступные в cotext каждой задачи с DAG. Я шел по следующей ссылке, чтобы создать динамический дагс и попробовал это -

https://airflow.incubator.apache.org/faq.html#how-can-i-create-dags-dynamically

class ParentBigquerySql(object):
    def __init__(self):
        pass


    def run(self, **context):
        logging.info('Running job')
        batch_id = 100
        #parent_sql = '''SELECT max(run_start_date) AS run_start_date,
        #                max(run_end_date) AS run_end_date
        #                FROM `vintel_rel_2_0_staging_westfield.in_venue_batch_dates_daily`'''
        parent_sql = '''SELECT run_start_date, run_end_date 
                        from vintel_rel_2_0_staging_westfield.in_venue_batch_dates_daily
                        order by 1 ,2'''
        params = self.get_params(batch_id, parent_sql)
        XcomManager.push_query_params(context, params)
        return params


    def get_params(self, batch_id, parent_sql):
        batch_id = str(batch_id)
        result = BigQueryManager.read_query_to_table(parent_sql)
        t_list = []
        if result and type(result) is not list and result.error_result:
            #LogManager.info("Error in running the parent jobs - %s." % (result.error_result))
            #LogManager.info("Not populating cache... ")
            pass
        elif len(result) > 0:
            for row in result:
                if len(row) > 0:
                    run_start_date = row[0]
                    run_end_date = row[1]
                    if run_start_date and run_end_date:
                        t_list.append({'min_date': run_start_date, 'max_date': run_end_date})

            params = {}
            params['date_range'] = t_list
            return params


    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 23),
    'retries': 1,
    'provide_context': True,
    'retry_delay': timedelta(minutes=2),
}

dag = DAG('parent_dynamic_job_dag',  # give the dag a name
          schedule_interval='@once',
          default_args=default_args
          )

def pull_child11(**context):
    logging.info(" Date range " + str(context['date_range']))

def conditionally_trigger(context, dag_run_obj):
    return dag_run_obj

def create_dag_from_task(**context):
    job = ParentBigquerySql()
    job.run(**context)
    logging.info("Context data")
    logging.info(context)
    params = XcomManager.pull_query_params(context)
    logging.info("Xcomm parameters: " + str(params))
    tl = []
    counter = 1
    for d1 in params['date_range']:
        dyn_dag_id = 'child_dag_id' + str(counter)
        dag_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': context['execution_date'],
            'execution_date': context['execution_date'],
            'retries': 1,
            'provide_context': True,
            'retry_delay': timedelta(minutes=2),
        }
        dyn_dag = DAG(dyn_dag_id,  # give the dag a name
                  schedule_interval='@once',
                  default_args=dag_args
                  )
        t1 = PythonOperator(
                        task_id='child' + str(counter),
                        dag=dyn_dag,
                        provide_context=True,
                        python_callable=pull_child11,
                        op_kwargs={'dag_id':10, 'date_range':d1}
                    )
        t2 = TriggerDagRunOperator(task_id='test_trigger_dag',
                                            trigger_dag_id='child_dag_id' + str((counter + 1)),
                                            python_callable=conditionally_trigger,
                                            dag=dyn_dag)
        t1.set_downstream(t2)
        logging.info("Updating globals for the dag " + dyn_dag_id)
        #trigger_op.execute(context)
        globals()[dyn_dag_id] = dyn_dag ##Assing DAG objects to global namespace
        if counter > 2:
            break
        counter = counter + 1


push1 = PythonOperator(
    task_id='100-Parent',
    dag=dag,
    provide_context=True,
    python_callable=create_dag_from_task,
    op_kwargs={'dag_id':100})

push11 = PythonOperator(
    task_id='101-Child',
    dag=dag,
    provide_context=True,
    python_callable=pull_child11,
    op_kwargs={'dag_id': 100, 'date_range': {'start_date': 'temp_start_date', 'end_date': 'temp_end_date'}})

t2 = TriggerDagRunOperator(task_id='test_trigger_dag',
                        trigger_dag_id='child_dag_id1',
                        python_callable=conditionally_trigger,
                        dag=dag)
push1.set_downstream(push11)
push11.set_downstream(t2)

Я получаю следующую ошибку -

    [2018-05-01 09:24:27,764] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-05-01 09:24:27,875] {models.py:189} INFO - Filling up the DagBag from /mnt/test_project /airflow/dags
[2018-05-01 09:25:02,074] {models.py:1197} INFO - Dependencies all met for <TaskInstance: parent_dynamic_job_dag.test_trigger_dag 2018-04-23 00:00:00 [up_for_retry]>
[2018-05-01 09:25:02,081] {base_executor.py:49} INFO - Adding to queue: airflow run parent_dynamic_job_dag test_trigger_dag 2018-04-23T00:00:00 --local -sd DAGS_FOLDER/test_dynamic_parent_child.py
[2018-05-01 09:25:07,003] {sequential_executor.py:40} INFO - Executing command: airflow run parent_dynamic_job_dag test_trigger_dag 2018-04-23T00:00:00 --local -sd DAGS_FOLDER/test_dynamic_parent_child.py
[2018-05-01 09:25:08,235] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-05-01 09:25:08,431] {models.py:189} INFO - Filling up the DagBag from /mnt/test_project /airflow/dags/test_dynamic_parent_child.py
[2018-05-01 09:26:44,207] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run parent_dynamic_job_dag test_trigger_dag 2018-04-23T00:00:00 --job_id 178 --raw -sd DAGS_FOLDER/test_dynamic_parent_child.py']
[2018-05-01 09:26:45,243] {base_task_runner.py:98} INFO - Subtask: [2018-05-01 09:26:45,242] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-05-01 09:26:45,416] {base_task_runner.py:98} INFO - Subtask: [2018-05-01 09:26:45,415] {models.py:189} INFO - Filling up the DagBag from /mnt/test_project /airflow/dags/test_dynamic_parent_child.py
[2018-05-01 09:27:49,798] {base_task_runner.py:98} INFO - Subtask: [2018-05-01 09:27:49,797] {models.py:189} INFO - Filling up the DagBag from /mnt/test_project /airflow/dags
[2018-05-01 09:27:50,108] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-05-01 09:27:50,108] {base_task_runner.py:98} INFO - Subtask:   File "/Users/manishz/anaconda2/bin/airflow", line 27, in <module>
[2018-05-01 09:27:50,109] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-05-01 09:27:50,109] {base_task_runner.py:98} INFO - Subtask:   File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-05-01 09:27:50,110] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-05-01 09:27:50,110] {base_task_runner.py:98} INFO - Subtask:   File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-05-01 09:27:50,110] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-05-01 09:27:50,111] {base_task_runner.py:98} INFO - Subtask:   File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-05-01 09:27:50,111] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-05-01 09:27:50,112] {base_task_runner.py:98} INFO - Subtask:   File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/operators/dagrun_operator.py", line 67, in execute
[2018-05-01 09:27:50,112] {base_task_runner.py:98} INFO - Subtask:     dr = trigger_dag.create_dagrun(
[2018-05-01 09:27:50,112] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'NoneType' object has no attribute 'create_dagrun'
[2018-05-01 09:28:14,407] {jobs.py:2521} INFO - Task exited with return code 1
[2018-05-01 09:28:14,569] {jobs.py:1959} ERROR - Task instance <TaskInstance: parent_dynamic_job_dag.test_trigger_dag 2018-04-23 00:00:00 [failed]> failed
[2018-05-01 09:28:14,573] {models.py:4584} INFO - Updating state for <DagRun parent_dynamic_job_dag @ 2018-04-23 00:00:00: backfill_2018-04-23T00:00:00, externally triggered: False> considering 3 task(s)
[2018-05-01 09:28:14,576] {models.py:4631} INFO - Marking run <DagRun parent_dynamic_job_dag @ 2018-04-23 00:00:00: backfill_2018-04-23T00:00:00, externally triggered: False> failed
[2018-05-01 09:28:14,600] {jobs.py:2125} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 | kicked_off: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0
Traceback (most recent call last):
  File "/Users/manishz/anaconda2/bin/airflow", line 27, in <module>
    args.func(args)
  File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 185, in backfill
    delay_on_limit_secs=args.delay_on_limit)
  File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/models.py", line 3724, in run
    job.run()
  File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/jobs.py", line 198, in run
    self._execute()
  File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/jobs.py", line 2441, in _execute
    raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------
Some task instances failed:
%s

Но приведенный выше код не запускает следующие пакеты. Есть идеи, что здесь происходит?

Заранее спасибо Manish

...