Воздушный поток: динамически выводить dag_id для вызова из другого DAG - PullRequest
0 голосов
/ 08 февраля 2020

Я пытаюсь получить имя DAG для динамического вызова в другую DAG. В следующей задаче «trigger_transform_dag» не выполняется. Не могли бы вы помочь мне с динамическим выводом идентификатора dag для задачи 'trigger_transform_dag'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': airflow.utils.dates.days_ago(0),
}

def run_dag(**context):
    file_path='ABC'
    context['ti'].xcom_push(key = 'key1', value = file_path)
    return 1


def check_file_name(**context):
    pulled_value_1 = context['ti'].xcom_pull(task_ids = 'run_dataflow_template',key = 'key1')
    if pulled_value_1 = 'ABC':
       push_value = 'sample1'
       return push_value
    else:
       push_value = 'sample2'
       return push_value


    return pulled_value_1



with DAG('sample',
          default_args=default_args,
          schedule_interval='10 * * * *',
          start_date=datetime(2017, 3, 20),
          max_active_runs=1,
          catchup=False) as dag:


    t1 = PythonOperator(
            task_id='run_dataflow_template',
            provide_context=True,
            python_callable=run_dag
    )

    t2 = TriggerDagRunOperator(
    task_id="trigger_transform_dag",
    provide_context=True,
    trigger_dag_id=check_file_name()
    )


    end = DummyOperator(
        trigger_rule='one_success',
        task_id='end')


    t1 >> t2 >> end

1 Ответ

0 голосов
/ 08 февраля 2020

Я не знаю, есть ли более простой способ, но вы можете создать пользовательский оператор, который черпает вдохновение из TriggerDagRunOperator (https://github.com/apache/airflow/blob/master/airflow/operators/dagrun_operator.py) и использует переданный Callable для получения функции.

Что-то, что я взломал вместе очень быстро (можно определенно улучшить):

from airflow.models import DAG
from airflow.utils.dates import days_ago, timedelta
from airflow.operators.dagrun_operator import TriggerDagRunOperator
import random
import datetime
from typing import Dict, Optional, Union, Callable

from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.models import BaseOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults


class TriggerDagRunWithFuncOperator(BaseOperator):
    """
    Triggers a DAG run for a specified ``dag_id``
    :param trigger_dag_id_f: the dag_id function to trigger
    :type trigger_dag_id_f: Callable
    :param conf: Configuration for the DAG run
    :type conf: dict
    :param execution_date: Execution date for the dag (templated)
    :type execution_date: str or datetime.datetime
    """

    template_fields = ("execution_date", "conf")
    ui_color = "#ffefeb"

    @apply_defaults
    def __init__(
        self,
        get_dag_name_f: Callable,
        conf: Optional[Dict] = None,
        execution_date: Optional[Union[str, datetime.datetime]] = None,
        *args,
        **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.conf = conf
        self.get_dag_name_f = get_dag_name_f

        if not isinstance(execution_date, (str, datetime.datetime, type(None))):
            raise TypeError(
                "Expected str or datetime.datetime type for execution_date."
                "Got {}".format(type(execution_date))
            )

        self.execution_date: Optional[datetime.datetime] = execution_date  # type: ignore

    def execute(self, context: Dict):
        if isinstance(self.execution_date, datetime.datetime):
            run_id = "trig__{}".format(self.execution_date.isoformat())
        elif isinstance(self.execution_date, str):
            run_id = "trig__{}".format(self.execution_date)
            self.execution_date = timezone.parse(self.execution_date)  # trigger_dag() expects datetime
        else:
            run_id = "trig__{}".format(timezone.utcnow().isoformat())

        dag_id_to_call = self.get_dag_name_f()
        # Ignore MyPy type for self.execution_date because it doesn't pick up the timezone.parse() for strings
        trigger_dag(
            dag_id=dag_id_to_call,
            run_id=run_id,
            conf=self.conf,
            execution_date=self.execution_date,
            replace_microseconds=False,
        )


args={
    'owner': 'arocketman',
    'start_date': days_ago(1)
}

dag = DAG(dag_id='dyna_dag', default_args=args, schedule_interval=None)


def your_function():
    return 'my_sample_dag'

with dag:
    run_this_task = TriggerDagRunWithFuncOperator(
        task_id='run_this',
        get_dag_name_f=your_function
    )
...