Как отформатировать макросы в Airflow? - PullRequest
0 голосов
/ 16 сентября 2018

У меня есть следующее:

EXEC_DATE1 = '{{ macros.ds_add(ds, 1) }}'


EXEC_DATE2 = '{{ execution_date }}'

Я хочу создать переменную пути, которая выглядит следующим образом:

path1 = EXEC_DATE1 + '/' + HH:MM (of EXEC_DATE1)
path2 = EXEC_DATE2 + '/' + HH:MM (of EXEC_DATE2)

В конце концов это должно быть как:

2018-09-16/10:41

Как я могу это сделать?

Я пытался:

EXEC_DATE = '{{ execution_date }}'
EXEC_DATE = EXEC_DATE.strftime('%Y-%m-%d/%H:%M')

но это дает:

'str' object has no attribute 'strftime'

EDIT: Мой код:

EXEC_TIMESTAMP_PATH = "{{  execution_date.strftime('%Y-%m-%d/%H:%M') }}"
EXEC_DATE = "{{  execution_date.strftime('%H:%M') }}"
EXEC_TIME = "{{  mexecution_date.strftime('%Y-%m-%d') }}"

task3_op= BashOperator(
    task_id='task3',
    params={'EXEC_DATE':EXEC_DATE, 'EXEC_TIME':EXEC_TIME},
    bash_command="""python3 script.py '{{ var.value.task3_variable }}' '{{ params.EXEC_DATE }}' '{{ params.EXEC_TIME }}' 'file.json'""",
    dag=dag)

Это не работает. Параметры не отображаются.

Ответы [ 2 ]

0 голосов
/ 17 сентября 2018

Что конкретно не работает в коде вашего вопроса, так это то, что на вашем params нет запуска шаблонов (расширение макросов), а на вашем bash_command работает. Поэтому я держу пари, что представленный шаблон bash_command похож на E.G.:

python3 script.py 'task3_variable's value' '{{  execution_date.strftime('%H:%M') }}' '{{  execution_date.strftime('%Y-%m-%d') }}' 'file.json'

Это больше не расширяется, и ', вероятно, отбрасывает смысл и для Bash, потому что фактически у вас есть:

params = { 'EXEC_DATE': "{{  execution_date.strftime('%H:%M') }}",
           'EXEC_TIME': "{{  execution_date.strftime('%Y-%m-%d') }}" }

Таким образом, удаление одного уровня расширения макроса путем непосредственного использования шаблонов, которые вы установили в params вместо bash_command, работает для вас лучше.

task3_op = BashOperator(
    task_id='task3',
    bash_command="""
python3 script.py '{{ var.value.task3_variable }}' \
'{{execution_date.strftime('%H:%M')}}' '{{execution_date.strftime('%Y-%m-%d')}}' file.json
    """,
    dag=dag)

Вы должны использовать команду airflow render или проверять визуализированное представление в подробностях пользовательского интерфейса экземпляра задачи при отладке подобных проблем.

Вероятно, он покажет вам, что не так с вашими предполагаемыми расширениями макросов.
Этот пример должен выглядеть примерно так:

python3 script.py 'task3_variable's value' \
'12:21' '2018-09-16' file.json

Вы надеялись проложить два пути на основе дат; попробовать:

task3_op= BashOperator(
    task_id='task3',
    bash_command="""
python3 script.py \
  '{{ var.value.task3_variable }}' \
  '{{(execution_date + macros.timedelta(days=1).strftime('%Y-%m-%d/%H:%M')}}' \
  '{{execution_date.strftime('%Y-%m-%d/%H:%M')}}' \
  file.json
    """,
    dag=dag)

Воздушный поток 1.10 также добавил next_execution_date, что может работать лучше, чем добавление дня к execution_date, если ваш интервал ежедневный.

0 голосов
/ 16 сентября 2018

Вам нужно сделать это следующим образом:

EXEC_DATE = "{{ execution_date.strftime('%Y-%m-%d/%H:%M') }}"

В фигурных скобках следует использовать strftime.

Если вам нужна дата следующего исполнения, используйте следующее:

EXEC_DATE = "{{ next_execution_date.strftime('%Y-%m-%d/%H:%M') }}"

А если вы просто хотите добавить timedelta:

EXEC_DATE = "{{ (execution_date + macros.timedelta(days=1)).strftime('%Y-%m-%d/%H:%M') }}"

Ваш код может быть следующим:

BASH_COMMAND="""
python3 script.py {{ var.value.task3_variable }} {{  execution_date.strftime('%H:%M') }} {{  execution_date.strftime('%Y-%m-%d') }} file.json
"""

task3_op= BashOperator(
    task_id='task3',
    bash_command=BASH_COMMAND,
    dag=dag)
...