Я создаю группу обеспечения доступности баз данных, и ей нужны функциональные возможности для установки глобальных переменных с использованием kwargs, переданных из POST Json, используемого для запуска задания.До сих пор я пытался так:
import airflow
from airflow import DAG
from datetime import timedelta
DAG_Name = 'dag_test'
DEFAULT_ARGS = {
'owner': '...',
'depends_on_past': False,
'email': ['...'],
'email_on_failure': True,
'start_date': datetime(2020,8,31)
}
dag = DAG(DAG_Name, default_args=DEFAULT_ARGS, dagrun_timeout=timedelta(hours=2))
snap_date = ''
output_loc = ''
recast = ''
def define_param(**kwargs):
global snap_date
global output_loc
global recast
snapshot = str(kwargs['dag_run'].conf['snap_date'])
output_s3 = kwargs['dag_run'].conf['output_loc']
recast = str(kwargs['dag_run'].conf['recast'])
DEFINE_PARAMETERS = PythonOperator(
task_id='DEFINE_PARAMETERS',
python_callable=define_param,
provide_context=True,
dag=dag)
Но это не работает.Как бы я использовал kwargs для установки глобальных переменных dag?