Вы можете получить доступ к этим переменным с помощью os.environ["ENV VAR NAME"]
(обязательно import os
).Например:
import os
# ... other imports ...
dag = DAG(
dag_id="demo",
default_args=default_args,
schedule_interval="0 0 * * *",
)
def print_env_var():
print(os.environ["AIRFLOW_CTX_DAG_ID"])
print_context = PythonOperator(
task_id="print_env",
python_callable=print_env_var,
dag=dag,
)
Однако, общий способ доступа к таким переменным в задаче заключается в предоставлении контекста задачи путем установки в вашем операторе provide_context=True
.
Например:
dag = DAG(
dag_id="demo",
default_args=default_args,
schedule_interval="0 0 * * *",
)
def print_context(**context):
print(context)
print_context = PythonOperator(
task_id="print_context",
python_callable=print_context,
provide_context=True, # <====
dag=dag,
)
Переменная context
будет содержать ряд переменных, содержащих информацию о контексте задачи, в том числе те, которые указаны в вашем вопросе:
# {
# 'END_DATE': '2019-01-01',
# 'conf': <module 'airflow.configuration' from '/opt/conda/lib/python3.6/site-packages/airflow/configuration.py'>,
# 'dag': <DAG: context_demo>,
# 'dag_run': None,
# 'ds': '2019-01-01',
# 'ds_nodash': '20190101',
# 'end_date': '2019-01-01',
# 'execution_date': <Pendulum [2019-01-01T00:00:00+00:00]>,
# 'inlets': [],
# 'latest_date': '2019-01-01',
# 'macros': <module 'airflow.macros' from '/opt/conda/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
# 'next_ds': '2019-01-02',
# 'next_ds_nodash': '20190102',
# 'next_execution_date': datetime.datetime(2019, 1, 2, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
# 'outlets': [],
# 'params': {},
# 'prev_ds': '2018-12-31',
# 'prev_ds_nodash': '20181231',
# 'prev_execution_date': datetime.datetime(2018, 12, 31, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
# 'run_id': None,
# 'tables': None,
# 'task': <Task(PythonOperator): print_exec_date>,
# 'task_instance': <TaskInstance: context_demo.print_exec_date 2019-01-01T00:00:00+00:00 [None]>,
# 'task_instance_key_str': 'context_demo__print_exec_date__20190101',
# 'templates_dict': None,
# 'test_mode': True,
# 'ti': <TaskInstance: context_demo.print_exec_date 2019-01-01T00:00:00+00:00 [None]>,
# 'tomorrow_ds': '2019-01-02',
# 'tomorrow_ds_nodash': '20190102',
# 'ts': '2019-01-01T00:00:00+00:00',
# 'ts_nodash': '20190101T000000',
# 'ts_nodash_with_tz': '20190101T000000+0000',
# 'var': {'json': None, 'value': None},
# 'yesterday_ds': '2018-12-31',
# 'yesterday_ds_nodash': '20181231'
# }
Я объясняю, как обрабатывать контекст задачи внемного подробнее в этом блоге (см. «3. Передача контекста задачам»).