Макросы воздушного потока в Python Operator - PullRequest
0 голосов
/ 05 июня 2018

Я пытаюсь использовать макросы Airflow в своем операторе Python, но получаю сообщение «airflow: error: нераспознанные аргументы:»

Поэтому я импортирую функцию, имеющую 3 позиционных аргумента: ( sys.argv, start_date, end_date ), и я надеюсь сделать start_date и end_date датой выполнения в Airflow.

Аргументы функции выглядят примерно так

def main(argv,start_date,end_date):

Вот моя задача в DAG:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=main,
    op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
    dag=dag)

Ответы [ 2 ]

0 голосов
/ 06 июня 2018

Вы можете "обернуть" вызов функции main следующим образом:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=lambda **context: main([], context["ds"], context["ds"]),
    dag=dag)

Если лямбда не ваша чашка чая, вы можете определить функцию, вызвать ее и получить еепозвать main.

0 голосов
/ 05 июня 2018

Поскольку вы передаете даты, которые должны отображаться с помощью Airflow, вам нужно использовать параметр templates_dict в Операторе Python.Это единственное поле, которое Airflow распознает как содержащие шаблоны.

Вы можете создать пользовательский оператор Python, который распознает больше полей как шаблоны, скопировав существующий оператор и добавив соответствующие поля в template_fields кортеж.

def main(**kwargs):
    argv = kwargs.get('templates_dict').get('argv')
    start_date = kwargs.get('templates_dict').get('start_date')
    end_date = kwargs.get('templates_dict').get('end_date')


t1 = PythonOperator(task_id='Pull_DCM_Report',
                    provide_context=True,
                    python_callable=main,
                    templates_dict={'argv': sys.argv,
                                    'start_date': '{{ yesterday_ds }}',
                                    'end_date': '{{ ds }}'},
                    dag=dag)
...