Предупреждение об устаревании воздушного потока Переданы неверные аргументы - PullRequest
0 голосов
/ 11 ноября 2018

У меня есть следующий код на Airflow 1.9:

import_op = MySqlToGoogleCloudStorageOperator(
    task_id='import',
    mysql_conn_id='oproduction',
    google_cloud_storage_conn_id='gcpm',
    provide_context=True,
    approx_max_file_size_bytes = 100000000, #100MB per file
    sql = 'import.sql',
    params={'next_to_import': NEXT_TO_IMPORT, 'table_name' : TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name_orders,
    dag=dag)

Почему он генерирует:

/ USR / местные / Библиотека / python2.7 / расстояние-пакеты / воздуха / models.py: 2160: PendingDeprecationWarning: неверные аргументы были переданы MySqlToGoogleCloudStorageOperator. Поддержка передачи таких аргументов будет сброшен в Airflow 2.0. Неверные аргументы были: * args: () ** kwargs: {'provide_context': True} категория = PendingDeprecationWarning

В чем проблема с provide_context? Насколько мне известно, это необходимо для использования params.

1 Ответ

0 голосов
/ 11 ноября 2018

provide_context не требуется для params.

Параметр

params (тип dict) можно передать любому оператору.

В основном вы используете provide_context с PythonOperator, BranchPythonOperator. Хороший пример: https://airflow.readthedocs.io/en/latest/howto/operator.html#pythonoperator.

MySqlToGoogleCloudStorageOperator не имеет параметра provide_context, поэтому он передается в **kwargs, и вы получаете предупреждение об устаревании.

Если вы проверите строку документации PythonOperator для provide_context:

Если установлено значение true, Airflow будет передавать набор аргументов ключевых слов, которые могут использоваться в вашей функции. Этот набор kwargs точно соответствует что вы можете использовать в шаблонах Jinja. Чтобы это работало, нужно определить **kwargs в заголовке вашей функции.

Он имеет следующий код, если вы проверяете исходный код:

if self.provide_context:
            context.update(self.op_kwargs)
            context['templates_dict'] = self.templates_dict
            self.op_kwargs = context

Таким образом, в простых терминах он передает следующий словарь с templates_dict в вашу функцию pass в python_callable:

{
    'END_DATE': ds,
    'conf': configuration,
    'dag': task.dag,
    'dag_run': dag_run,
    'ds': ds,
    'ds_nodash': ds_nodash,
    'end_date': ds,
    'execution_date': self.execution_date,
    'latest_date': ds,
    'macros': macros,
    'params': params,
    'run_id': run_id,
    'tables': tables,
    'task': task,
    'task_instance': self,
    'task_instance_key_str': ti_key_str,
    'test_mode': self.test_mode,
    'ti': self,
    'tomorrow_ds': tomorrow_ds,
    'tomorrow_ds_nodash': tomorrow_ds_nodash,
    'ts': ts,
    'ts_nodash': ts_nodash,
    'yesterday_ds': yesterday_ds,
    'yesterday_ds_nodash': yesterday_ds_nodash,
}

Так что это можно использовать в функции следующим образом:

def print_context(ds, **kwargs):
    pprint(kwargs)
    ti = context['task_instance']
    exec_date = context['execution_date']
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag,
)
...