Как вызвать задачу по завершении двух других задач в Airflow DAG в python? - PullRequest
4 голосов
/ 15 апреля 2020

У меня есть сценарий, подобный приведенному ниже. Task 3 должно срабатывать при успешном завершении Task1 и Task2. Я поделился своим кодом ниже, вы можете сообщить мне, что в нем отсутствует для сценария?

enter image description here

Мой код

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('trigger_job', default_args=default_args, schedule_interval='@daily')

wait_for_task1 = AwsGlueCatalogPartitionSensor(
    task_id='Task1',
    database_name='db',
    table_name='table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

wait_for_task2 = AwsGlueCatalogPartitionSensor(
    task_id='Task2',
    database_name='db',
    table_name='table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_sql = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="schema_do_lines.sql",
    params={'limit': '50'},
    dag=dag
)

execute_sql.set_upstream(wait_for_task1)

Как это можно сделать в потоке воздуха, используя python?

1 Ответ

1 голос
/ 15 апреля 2020

Вам нужно два один простая вещь:

  1. Установите правильные зависимости. На данный момент вы кодировали, что execute_sql зависит от wait_for_task1. Вы должны указать, что execute_sql также зависит от wait_for_task2, добавив строку execute_sql.set_upstream(wait_for_task2) в конце вашего кода.
  2. БОНУС: Возможно, вам придется установить параметр trigger_rule в определении вашей задачи , Вы можете прочитать больше об этом в этой документации . В вашем конкретном случае c нет необходимости задавать его явно , потому что по умолчанию он установлен на all_success (т.е. выполняет задачу, только если все родители преуспели), и, таким образом, execute_sql будет срабатывает только тогда, когда обе задачи, от которых это зависит, были выполнены успешно.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...