Создание понятных человеку имен файлов в Apache Airflow vía {{run_id}} в шаблонной команде bash - PullRequest
0 голосов
/ 11 января 2019

Мой DAG выводит некоторые данные из базы данных и сохраняет их в локальном файле.

Другие задачи (в том же DagRun) будут читать, обрабатывать и т. Д.

Чтобы сохранить локально, мне нужно создать имя файла на основе dag, задачи, которая его создала, и запуска dag:

{{ dag.dag_id }}_{{ task.task_id }}_{{ run_id }}.csv

и я получаю что-то подобное:

my_dag_my_task_manual__2019-01-11T11:56:00.902937+00:00.csv

Как я могу сделать имя файла более понятным для человека? Я имею в виду, что-то вроде этого:

my_dag_my_task_manual__2019_01_11_11_56_00_902937_00_00.csv

Это шаблонная команда bash, в которой задача (BashOperator) составляет имя файла:

bash_command = \
        """
        comm -2 -3 {source_a_filepath} {source_b_filepath} > {TMP_PATH}/{{{{ dag.dag_id }}}}_{{{{ task.task_id }}}}_{{{{ run_id }}}}.csv;
        echo '{{{{ dag.dag_id }}}}_{{{{ task.task_id }}}}_{{{{ run_id }}}}'
        """.format(
            source_a_filepath=source_a_filepath,
            source_b_filepath=source_b_filepath,
            TMP_PATH=TMP_PATH)

1 Ответ

0 голосов
/ 11 января 2019

Это произойдет с сообщением name 'run_id' is not defined, потому что во время ввода параметров run_id все еще недоступно:

bash_command = \
    """
    echo '{{ params.run_id_readable }}';
    """

t1 = BashOperator(
    task_id="t1",
    bash_command=bash_command,
    params={"run_id_readable": run_id.split('.')[0].replace('T', '_').replace('-', '').replace(':', '')},
    dag=dag,
)

Это снова не удастся, потому что kwargs также недоступен:

bash_command = \
    """
    echo '{{ params.run_id_readable }}';
    """

t1 = BashOperator(
    task_id="t1",
    bash_command=bash_command,
    params={"run_id_readable": kwarg.get('run_id').split('.')[0].replace('T', '_').replace('-', '').replace(':', '')},

    dag=dag,
)

Это будет правильный путь, так как символ run_id будет доступен во время рендеринга шаблона jinja:

bash_command = \
    """
    echo '{{ run_id.split('.')[0].replace('T', '_').replace('-', '').replace(':', '') }}';
    """

t1 = BashOperator(
    task_id="t1",
    bash_command=bash_command,
    dag=dag,
)

С помощью PythonOperator вы можете воспользоваться тем, что kwargs dict доступен во время выполнения 'callable_function`:

def python_callable(**kwargs):
    """ """
    dag_id = kwargs.get('dag').dag_id
    task_id = kwargs.get('task').task_id
    run_id = kwargs.get('run_id', '-.:').split('.')[0].replace('T', '_').replace('-', '').replace(':', '')
    print("{}_{}_{}".format(dag_id, task_id, run_id))

t2 = PythonOperator(
    task_id="t2",
    provide_context=True,
    python_callable=python_callable,
    dag=dag)
...