Я новичок в Airflow и пытаюсь запустить два внешних dag из моего родительского dag. Я использовал TriggerDagRunOperator для запуска внешнего dag, но проблема в том, что он вызывает второй внешний dag до завершения первого внешнего dag. Чтобы избежать этого, я использовал ExternalTaskSensor, но я не могу правильно реализовать. Ниже мой код.
Родительский Dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from time import gmtime, strftime
import pprint
import airflow
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.sensors import ExternalTaskSensor
pp = pprint.PrettyPrinter(indent=4)
def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
c_p = context['params']['condition_param']
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
return None
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['aaaa@aaa.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
#'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'PARALLEL_TRIGGER_EXTERNAL_DAG',
default_args=default_args,
schedule_interval=None,
max_active_runs=1)
# t1, t2 and t3 are examples of tasks created by instantiating operators
create_command1 = "/usr/local/airflow/dags/basant/job1.sh "
t1 = BashOperator(
task_id='job1',
bash_command=create_command1,
dag=dag
)
trigger1 = TriggerDagRunOperator(
task_id='trigger-test-dag-1',
trigger_dag_id="PARALLEL_EXECUTION_TEST_DAG_1",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello World'},
dag=dag,
)
sensor1 = ExternalTaskSensor(
task_id='sensor-test-dag-1',
external_dag_id='PARALLEL_EXECUTION_TEST_DAG_1',
external_task_id= 'job2',
execution_delta=None,
dag=dag
)
trigger2 = TriggerDagRunOperator(
task_id='trigger-test-dag-2',
trigger_dag_id="PARALLEL_EXECUTION_TEST_DAG_2",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello World'},
dag=dag,
)
sensor2 = ExternalTaskSensor(
task_id='sensor-test-dag-2',
external_dag_id='PARALLEL_EXECUTION_TEST_DAG_2',
external_task_id= 'job4',
execution_delta=None,
dag=dag
)
t1 >> trigger1 >> sensor1 >> trigger2 >> sensor2
1-й Внешний Dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from time import gmtime, strftime
import airflow
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['aaaa@aaa.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
dag = DAG(
'PARALLEL_EXECUTION_TEST_DAG_1',
default_args=default_args,
schedule_interval=None,
max_active_runs=1)
# t1, t2 and t3 are examples of tasks created by instantiating operators
create_command1 = "/usr/local/airflow/dags/basant/job1.sh "
t1 = BashOperator(
task_id='job1',
bash_command=create_command1,
dag=dag
)
create_command2 = "/usr/local/airflow/dags/basant/job2.sh "
t2 = BashOperator(
task_id='job2',
bash_command=create_command2,
dag=dag
)
t1 >> t2
2-й Внешний Dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from time import gmtime, strftime
import airflow
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['aaaa@aaa.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
dag = DAG(
'PARALLEL_EXECUTION_TEST_DAG_2',
default_args=default_args,
schedule_interval=None,
max_active_runs=1)
# t1, t2 and t3 are examples of tasks created by instantiating operators
create_command1 = "/usr/local/airflow/dags/basant/job3.sh "
t1 = BashOperator(
task_id='job3',
bash_command=create_command1,
dag=dag
)
create_command2 = "/usr/local/airflow/dags/basant/job4.sh "
t2 = BashOperator(
task_id='job4',
bash_command=create_command2,
dag=dag
)
t1 >> t2
Вышеупомянутый поток выглядит как это: Job1 -> trigger-test-dag-1 -> sensor-test-dag-1 -> trigger-test-dag-2 -> sensor-test-dag-2
Мой Job1 и trigger-test-dag1 работает успешно, но sensor-test-dag-1 работает навсегда, и в файле журнала я получаю сообщение вроде:
[2020-02-06 12:46:39,703] {{external_task_sensor.py:113}} INFO - Poking for PARALLEL_EXECUTION_TEST_DAG_1.job2 on 2020-02-06T12:14:45.605733+00:00 ...