Как сослаться на дату выполнения DAG внутри `KubernetesPodOperator`? - PullRequest
2 голосов
/ 22 апреля 2019

Я пишу DAG Airflow для извлечения данных из API и сохранения их в базе данных, которой я владею. Следуя рекомендациям, изложенным в Мы все используем Airflow Wrong , я пишу DAG как последовательность KubernetesPodOperator s, которые выполняют довольно простые функции Python в качестве точки входа в образ Docker.

Проблема, которую я пытаюсь решить, заключается в том, что этот DAG должен извлекать данные только для execution_date.

Если бы я использовал PythonOperator ( doc ), я мог бы использовать аргумент provide_context, чтобы сделать дату выполнения доступной для функции. Но, судя по документации оператора KubernetesPodOperator , кажется, что оператор Kubernetes не имеет аргумента, который делает то, что делает provide_context.

Мое лучшее предположение заключается в том, что вы можете использовать команду arguments для передачи диапазона дат, и, поскольку она является шаблонной, вы можете ссылаться на нее следующим образом:

my_pod_operator = KubernetesPodOperator(
    # ... other args here
    arguments=['python', 'my_script.py', '{{ ds }}'],
    # arguments continue
)

И тогда вы получите дату начала, как и любой другой аргумент, предоставленный файлу Python, запущенному как скрипт, с помощью sys.argv.

Это правильный способ сделать это?

Спасибо за помощь.

1 Ответ

3 голосов
/ 23 апреля 2019

Да, это правильный способ сделать это.

У каждого оператора будет template_fields.Все параметры, перечисленные в template_fields, могут отображать шаблоны Jinja2 и макросы Airflow.

Для KubernetesPodOperator, если вы отметите docs , вы найдете:

template_fields = ['cmds', 'arguments', 'env_vars', 'config_file']

, которыйозначает, что вы можете передать '{{ ds }}' любому из четырех параметров, перечисленных выше.

...