Самостоятельный триггер DAG воздушного потока в контуре - PullRequest
0 голосов
/ 10 октября 2019

Я хочу запустить dag, который содержит 2 задачи в цикле, скажем, 1..10. Одним из способов является то, что я могу динамически генерировать задачи с различными идентификаторами задач в dag. Следовательно, в конце будет 40 заданий, которые я не хочу. Поэтому я создал TriggerDagRunOperator, который будет называть себя dag. Проблема в том, что я не могу понять, как настроить обновление счетчика, чтобы при следующем запуске я мог получить обновленное значение счетчика. Есть ли что-то, где я могу поделиться информацией для следующего экземпляра? Ниже приведен код:

import airflow
from airflow import DAG
from airflow.operators import BashOperator,PythonOperator
from datetime import datetime, timedelta
from airflow.operators.dagrun_operator import TriggerDagRunOperator

today = datetime.today()

dag = DAG(dag_id='main_dag', default_args={
        "owner": "airflow",
        "start_date": airflow.utils.dates.days_ago(2),
    },
    schedule_interval=None,
   )

# t1, t2 are examples of tasks created using operators




bash_task_1 = BashOperator(
    task_id="bash_task_1",
    bash_command='echo "Here is the message1: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=dag)

bash_task_2 = BashOperator(
    task_id="bash_task_2",
    bash_command='echo "Here is the message2: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=dag)


def run_this_func(**kwargs):
    """
    Print the payload "message" passed to the DagRun conf attribute.
    :param dict kwargs: Context
    """
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['counter']))
    return kwargs['dag_run'].conf['counter']    


def conditionally_trigger(context, dag_run_obj):
    """
    This function decides whether or not to Trigger the remote DAG
    """             
    counter = context['params']['counter']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    while counter < 3:
       counter += 1 # Where should i set this back to be available for next instance      
       return dag_run_obj
    return None   


trigger = TriggerDagRunOperator(
     task_id='trigger_job1',
     trigger_dag_id="main_dag",
     python_callable=conditionally_trigger,
     params={'counter': 1, 'message': 'Hello'},
     dag=dag)

bash_task_1 >> bash_task_2 >> trigger
...