provide_context
не требуется для params
.
Параметр
params
(тип dict
) можно передать любому оператору.
В основном вы используете provide_context
с PythonOperator
, BranchPythonOperator
. Хороший пример: https://airflow.readthedocs.io/en/latest/howto/operator.html#pythonoperator.
MySqlToGoogleCloudStorageOperator
не имеет параметра provide_context
, поэтому он передается в **kwargs
, и вы получаете предупреждение об устаревании.
Если вы проверите строку документации PythonOperator
для provide_context
:
Если установлено значение true, Airflow будет передавать набор аргументов ключевых слов, которые могут
использоваться в вашей функции. Этот набор kwargs точно соответствует
что вы можете использовать в шаблонах Jinja. Чтобы это работало, нужно
определить **kwargs
в заголовке вашей функции.
Он имеет следующий код, если вы проверяете исходный код:
if self.provide_context:
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
self.op_kwargs = context
Таким образом, в простых терминах он передает следующий словарь с templates_dict
в вашу функцию pass в python_callable
:
{
'END_DATE': ds,
'conf': configuration,
'dag': task.dag,
'dag_run': dag_run,
'ds': ds,
'ds_nodash': ds_nodash,
'end_date': ds,
'execution_date': self.execution_date,
'latest_date': ds,
'macros': macros,
'params': params,
'run_id': run_id,
'tables': tables,
'task': task,
'task_instance': self,
'task_instance_key_str': ti_key_str,
'test_mode': self.test_mode,
'ti': self,
'tomorrow_ds': tomorrow_ds,
'tomorrow_ds_nodash': tomorrow_ds_nodash,
'ts': ts,
'ts_nodash': ts_nodash,
'yesterday_ds': yesterday_ds,
'yesterday_ds_nodash': yesterday_ds_nodash,
}
Так что это можно использовать в функции следующим образом:
def print_context(ds, **kwargs):
pprint(kwargs)
ti = context['task_instance']
exec_date = context['execution_date']
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)