Определение глобальных переменных воздушного потока с использованием Kwargs, передаваемых из POST Json - PullRequest
0 голосов
/ 19 сентября 2019

Я создаю группу обеспечения доступности баз данных, и ей нужны функциональные возможности для установки глобальных переменных с использованием kwargs, переданных из POST Json, используемого для запуска задания.До сих пор я пытался так:

import airflow
from airflow import DAG
from datetime import timedelta

DAG_Name = 'dag_test'

DEFAULT_ARGS = {
    'owner': '...',
    'depends_on_past': False,
    'email': ['...'],
    'email_on_failure': True,
    'start_date': datetime(2020,8,31)
}

dag = DAG(DAG_Name, default_args=DEFAULT_ARGS, dagrun_timeout=timedelta(hours=2))

snap_date = ''
output_loc = ''
recast = ''

def define_param(**kwargs):
    global snap_date
    global output_loc
    global recast
    snapshot = str(kwargs['dag_run'].conf['snap_date'])
    output_s3 = kwargs['dag_run'].conf['output_loc']
    recast = str(kwargs['dag_run'].conf['recast'])


DEFINE_PARAMETERS = PythonOperator(
    task_id='DEFINE_PARAMETERS',
    python_callable=define_param,
    provide_context=True,
    dag=dag)

Но это не работает.Как бы я использовал kwargs для установки глобальных переменных dag?

1 Ответ

0 голосов
/ 19 сентября 2019

Используйте Variable.set, так как это сделает актуальное обновление базы данных, а также, при необходимости, проведет для вас сеанс и сериализацию.

Variable.set("snap_date", "2019-09-17")

Ссылка: https://github.com/apache/airflow/blob/1.10.1/airflow/models.py#L4558-L4569

...