Как запустить два внешних даг от моего родительского даг - PullRequest
0 голосов
/ 06 февраля 2020

Я новичок в Airflow и пытаюсь запустить два внешних dag из моего родительского dag. Я использовал TriggerDagRunOperator для запуска внешнего dag, но проблема в том, что он вызывает второй внешний dag до завершения первого внешнего dag. Чтобы избежать этого, я использовал ExternalTaskSensor, но я не могу правильно реализовать. Ниже мой код.


Родительский Dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from time import gmtime, strftime
import pprint
import airflow
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.sensors import ExternalTaskSensor

pp = pprint.PrettyPrinter(indent=4)
def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    c_p = context['params']['condition_param']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if context['params']['condition_param']:
        dag_run_obj.payload = {'message': context['params']['message']}
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj
    return None

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email': ['aaaa@aaa.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
    #'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'PARALLEL_TRIGGER_EXTERNAL_DAG',
    default_args=default_args,
    schedule_interval=None,
    max_active_runs=1)

# t1, t2 and t3 are examples of tasks created by instantiating operators
create_command1 = "/usr/local/airflow/dags/basant/job1.sh "
t1 = BashOperator(
    task_id='job1',
    bash_command=create_command1,
    dag=dag
    )

trigger1 = TriggerDagRunOperator(
    task_id='trigger-test-dag-1',
    trigger_dag_id="PARALLEL_EXECUTION_TEST_DAG_1",
    python_callable=conditionally_trigger,
    params={'condition_param': True, 'message': 'Hello World'},
    dag=dag,
)
sensor1 = ExternalTaskSensor(
    task_id='sensor-test-dag-1',
    external_dag_id='PARALLEL_EXECUTION_TEST_DAG_1',
    external_task_id= 'job2',
    execution_delta=None,
    dag=dag
)

trigger2 = TriggerDagRunOperator(
    task_id='trigger-test-dag-2',
    trigger_dag_id="PARALLEL_EXECUTION_TEST_DAG_2",
    python_callable=conditionally_trigger,
    params={'condition_param': True, 'message': 'Hello World'},
    dag=dag,
)
sensor2 = ExternalTaskSensor(
    task_id='sensor-test-dag-2',
    external_dag_id='PARALLEL_EXECUTION_TEST_DAG_2',
    external_task_id= 'job4',
    execution_delta=None,
    dag=dag
)

t1 >> trigger1 >> sensor1 >> trigger2 >> sensor2

1-й Внешний Dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from time import gmtime, strftime
import airflow

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email': ['aaaa@aaa.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

dag = DAG(
    'PARALLEL_EXECUTION_TEST_DAG_1',
    default_args=default_args,
    schedule_interval=None,
    max_active_runs=1)
# t1, t2 and t3 are examples of tasks created by instantiating operators
create_command1 = "/usr/local/airflow/dags/basant/job1.sh "
t1 = BashOperator(
    task_id='job1',
    bash_command=create_command1,
    dag=dag
        )
create_command2 = "/usr/local/airflow/dags/basant/job2.sh "
t2 = BashOperator(
    task_id='job2',
    bash_command=create_command2,
    dag=dag
        )


t1 >> t2

2-й Внешний Dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from time import gmtime, strftime
import airflow

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email': ['aaaa@aaa.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

dag = DAG(
    'PARALLEL_EXECUTION_TEST_DAG_2',
    default_args=default_args,
    schedule_interval=None,
    max_active_runs=1)
# t1, t2 and t3 are examples of tasks created by instantiating operators
create_command1 = "/usr/local/airflow/dags/basant/job3.sh "
t1 = BashOperator(
    task_id='job3',
    bash_command=create_command1,
    dag=dag
        )
create_command2 = "/usr/local/airflow/dags/basant/job4.sh "
t2 = BashOperator(
    task_id='job4',
    bash_command=create_command2,
    dag=dag
        )


t1 >> t2

Вышеупомянутый поток выглядит как это: Job1 -> trigger-test-dag-1 -> sensor-test-dag-1 -> trigger-test-dag-2 -> sensor-test-dag-2

Мой Job1 и trigger-test-dag1 работает успешно, но sensor-test-dag-1 работает навсегда, и в файле журнала я получаю сообщение вроде:

[2020-02-06 12:46:39,703] {{external_task_sensor.py:113}} INFO - Poking for PARALLEL_EXECUTION_TEST_DAG_1.job2 on 2020-02-06T12:14:45.605733+00:00 ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...