Airflow BashOperator: передача параметра во внешний скрипт bash - PullRequest
0 голосов
/ 25 февраля 2020

Возникли проблемы при передаче параметров во внешний bash скрипт из BashOperator. Когда я запускаю локальную команду, параметры подставляются правильно:

log_cleanup = """ echo "{{ params.BASE_LOG_FOLDER }}" """
log_cleanup_task = BashOperator(
        task_id='log_cleanup_task',
        provide_context=True,
        bash_command = log_cleanup,
        params = {'BASE_LOG_FOLDER': "/var/opt"},
        dag=dagInstance,
)

prints:  "/var/opt"   (without the double quotes)

Но если я вызываю внешний сценарий bash, параметры не подставляются в.

log_cleanup_task = BashOperator(
        task_id='log_cleanup_task',
        provide_context=True,
        bash_command= str(DAGS_FOLDER)+"/scripts/log_cleanup.sh ",
        params = {'BASE_LOG_FOLDER': "/var/opt" },
        dag=dagInstance,
)

#log_cleanup.sh:
#! /usr/bin/bash
echo "{{ params.BASE_LOG_FOLDER }}"


prints: "{{ params.BASE_LOG_FOLDER }}"    (without the double quotes)

Во внешнем сценарии bash я не могу получить параметры для замены, как они делают, когда оператор хранится в сценарии DAG .py.

Нужно ли передавать параметры в качестве аргументов командной строки вместо? Работает ли шаблонизатор jinja только с файлами .py?

1 Ответ

1 голос
/ 26 февраля 2020

Удалить пробел после "log_cleanup.sh " в bash_command

Таким образом, ваша задача должна стать:

log_cleanup_task = BashOperator(
        task_id='log_cleanup_task',
        provide_context=True,
        bash_command= "scripts/log_cleanup.sh",
        params = {'BASE_LOG_FOLDER': "/var/opt" },
        dag=dagInstance,
)

Примечание , что сценарии папка должна находиться внутри папки, содержащей файл DAG, и должна содержать относительный путь к сценарию (относительно папки, содержащей эту группу DAG)

Основной причиной, по которой вы получили ошибку TemplateNotFound, был путь, упомянутый в bash_command не распознается Jinja (шаблонизатор, используемый Airflow). Jinja распознает только путь, переданный в DAG.template_searchpath Путь по умолчанию - это папка, содержащая DAG, так что вы можете напрямую поместить папку сценариев в папку DAG, если ваша DAG находится непосредственно в $AIRFLOW_HOME/dags. Или вы можете передать путь к вашей папке в DAG.template_searchpath следующим образом:

dag = DAG("example_dag", template_searchpath="/var/opt/scripts")

# And then just pass "filename" to bash_command
log_cleanup_task = BashOperator(
        task_id='log_cleanup_task',
        provide_context=True,
        bash_command= "log_cleanup.sh ",
        params = {'BASE_LOG_FOLDER': "/var/opt" },
        dag=dag,
)
...