Я хочу запустить dag, который содержит 2 задачи в цикле, скажем, 1..10. Одним из способов является то, что я могу динамически генерировать задачи с различными идентификаторами задач в dag. Следовательно, в конце будет 40 заданий, которые я не хочу. Поэтому я создал TriggerDagRunOperator, который будет называть себя dag. Проблема в том, что я не могу понять, как настроить обновление счетчика, чтобы при следующем запуске я мог получить обновленное значение счетчика. Есть ли что-то, где я могу поделиться информацией для следующего экземпляра? Ниже приведен код:
import airflow
from airflow import DAG
from airflow.operators import BashOperator,PythonOperator
from datetime import datetime, timedelta
from airflow.operators.dagrun_operator import TriggerDagRunOperator
today = datetime.today()
dag = DAG(dag_id='main_dag', default_args={
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(2),
},
schedule_interval=None,
)
# t1, t2 are examples of tasks created using operators
bash_task_1 = BashOperator(
task_id="bash_task_1",
bash_command='echo "Here is the message1: '
'{{ dag_run.conf["message"] if dag_run else "" }}" ',
dag=dag)
bash_task_2 = BashOperator(
task_id="bash_task_2",
bash_command='echo "Here is the message2: '
'{{ dag_run.conf["message"] if dag_run else "" }}" ',
dag=dag)
def run_this_func(**kwargs):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param dict kwargs: Context
"""
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['counter']))
return kwargs['dag_run'].conf['counter']
def conditionally_trigger(context, dag_run_obj):
"""
This function decides whether or not to Trigger the remote DAG
"""
counter = context['params']['counter']
print("Controller DAG : conditionally_trigger = {}".format(c_p))
while counter < 3:
counter += 1 # Where should i set this back to be available for next instance
return dag_run_obj
return None
trigger = TriggerDagRunOperator(
task_id='trigger_job1',
trigger_dag_id="main_dag",
python_callable=conditionally_trigger,
params={'counter': 1, 'message': 'Hello'},
dag=dag)
bash_task_1 >> bash_task_2 >> trigger