Воздушный поток: Как шаблонизировать или передать вывод функции Python Callable в качестве аргументов другим задачам? - PullRequest
1 голос
/ 21 марта 2019

Я новичок в Airflow и работаю над тем, чтобы сделать мой конвейер ETL более пригодным для повторного использования. Первоначально у меня было несколько строк кода верхнего уровня, который определял бы job_start на основе нескольких пользовательских параметров ввода, но в ходе многих поисков я обнаружил, что это сработает при каждом сердцебиении , что вызывает нежелательное поведение в усечение таблицы.

Теперь я изучаю, как обернуть этот код верхнего уровня в Python Callable, чтобы он был защищен от обновления, но я не уверен, как лучше передать выходные данные другим моим задачам. Суть моего кода ниже:

def get_job_dts(): 

     #Do something to determine the appropriate job_start_dt and job_end_dt

     #Package up as a list as inputs to other PythonCallables using op_args

     job_params = [job_start_dt, job_end_dt]

     return job_params

t0 = PythonOperator(
    task_id = 'get_dates'
    python_callable = get_job_dts
    dag=dag
)

t1 = PythonOperator(
     task_id = 'task_1'
     ,python_callable=first_task
     ,op_args=job_params #<-- How do I send job_params to op_args??
     ,dag=dag
)

t0 >> t1

Я искал вокруг и слышал упоминания о шаблонах jinja, переменных или xcoms, но я не совсем уверен, как это реализовать. У кого-нибудь есть пример, который я мог бы посмотреть, где я могу сохранить этот список в переменную, которая может использоваться моими другими задачами?

Ответы [ 3 ]

1 голос
/ 21 марта 2019

Лучший способ сделать это - поместить ваше значение в XCom в get_job_dts и извлечь значение из Xcom в first_task.

def get_job_dts(**kwargs): 

     #Do something to determine the appropriate job_start_dt and job_end_dt

     #Package up as a list as inputs to other PythonCallables using op_args

    job_params = [job_start_dt, job_end_dt]

    # Push job_params into XCom
    kwargs['ti'].xcom_push(key='job_params', value=job_params)
    return job_params


def first_task(ti, **kwargs):
    # Pull job_params into XCom
    job_params = ti.xcom_pull(key='job_params')
    # And then do the rest


t0 = PythonOperator(
    task_id = 'get_dates'
    python_callable = get_job_dts
    dag=dag
)

t1 = PythonOperator(
    task_id = 'task_1',
    provide_context=True,
    python_callable=first_task,
    op_args=job_params,
    dag=dag
)

t0 >> t1
0 голосов
/ 23 марта 2019

Поскольку вы упомянули динамическое изменение времени начала и окончания задачи, я предположил, что вам нужно создать динамический dag, а не просто передать args dag. В частности, изменение времени начала и интервала без изменения имени метки приведет к неожиданному результату, поэтому настоятельно рекомендуется этого не делать. Таким образом, вы можете обратиться к этой ссылке , чтобы увидеть, может ли эта стратегия помочь.

0 голосов
/ 21 марта 2019

Как упоминалось RyantheCoder , XCOM - это путь.Моя реализация ориентирована на учебник, где я неявно выполняю push автоматически из возвращаемого значения в PythonCallable.

Я все еще смущен разницей в передаче (ti, ** kwargs) и использования (** context) функции, которая тянет.Кроме того, откуда берется "ти"?

Любые разъяснения приветствуются.

def get_job_dts(**kwargs): 

     #Do something to determine the appropriate job_start_dt and job_end_dt

     #Package up as a list as inputs to other PythonCallables using op_args

     job_params = [job_start_dt, job_end_dt]

     # Automatically pushes to XCOM, refer to: Airflow XCOM tutorial: https://airflow.apache.org/concepts.html?highlight=xcom#xcoms
     return job_params

def first_task(**context):

    # Change task_ids to whatever task pushed the XCOM vars you need, rest are standard notation
    job_params = job_params = context['task_instance'].xcom_pull(task_ids='get_dates')

    # And then do the rest


t0 = PythonOperator(
    task_id = 'get_dates'
    python_callable = get_job_dts
    dag=dag
)

t1 = PythonOperator(
    task_id = 'task_1',
    provide_context=True,
    python_callable=first_task,
    dag=dag
)

t0 >> t1
...