Я новичок в Airflow и работаю над тем, чтобы сделать мой конвейер ETL более пригодным для повторного использования. Первоначально у меня было несколько строк кода верхнего уровня, который определял бы job_start на основе нескольких пользовательских параметров ввода, но в ходе многих поисков я обнаружил, что это сработает при каждом сердцебиении , что вызывает нежелательное поведение в усечение таблицы.
Теперь я изучаю, как обернуть этот код верхнего уровня в Python Callable, чтобы он был защищен от обновления, но я не уверен, как лучше передать выходные данные другим моим задачам. Суть моего кода ниже:
def get_job_dts():
#Do something to determine the appropriate job_start_dt and job_end_dt
#Package up as a list as inputs to other PythonCallables using op_args
job_params = [job_start_dt, job_end_dt]
return job_params
t0 = PythonOperator(
task_id = 'get_dates'
python_callable = get_job_dts
dag=dag
)
t1 = PythonOperator(
task_id = 'task_1'
,python_callable=first_task
,op_args=job_params #<-- How do I send job_params to op_args??
,dag=dag
)
t0 >> t1
Я искал вокруг и слышал упоминания о шаблонах jinja, переменных или xcoms, но я не совсем уверен, как это реализовать. У кого-нибудь есть пример, который я мог бы посмотреть, где я могу сохранить этот список в переменную, которая может использоваться моими другими задачами?