Что конкретно не работает в коде вашего вопроса, так это то, что на вашем params
нет запуска шаблонов (расширение макросов), а на вашем bash_command
работает. Поэтому я держу пари, что представленный шаблон bash_command
похож на E.G.:
python3 script.py 'task3_variable's value' '{{ execution_date.strftime('%H:%M') }}' '{{ execution_date.strftime('%Y-%m-%d') }}' 'file.json'
Это больше не расширяется, и '
, вероятно, отбрасывает смысл и для Bash, потому что фактически у вас есть:
params = { 'EXEC_DATE': "{{ execution_date.strftime('%H:%M') }}",
'EXEC_TIME': "{{ execution_date.strftime('%Y-%m-%d') }}" }
Таким образом, удаление одного уровня расширения макроса путем непосредственного использования шаблонов, которые вы установили в params
вместо bash_command
, работает для вас лучше.
task3_op = BashOperator(
task_id='task3',
bash_command="""
python3 script.py '{{ var.value.task3_variable }}' \
'{{execution_date.strftime('%H:%M')}}' '{{execution_date.strftime('%Y-%m-%d')}}' file.json
""",
dag=dag)
Вы должны использовать команду airflow render или проверять визуализированное представление в подробностях пользовательского интерфейса экземпляра задачи при отладке подобных проблем.
Вероятно, он покажет вам, что не так с вашими предполагаемыми расширениями макросов.
Этот пример должен выглядеть примерно так:
python3 script.py 'task3_variable's value' \
'12:21' '2018-09-16' file.json
Вы надеялись проложить два пути на основе дат; попробовать:
task3_op= BashOperator(
task_id='task3',
bash_command="""
python3 script.py \
'{{ var.value.task3_variable }}' \
'{{(execution_date + macros.timedelta(days=1).strftime('%Y-%m-%d/%H:%M')}}' \
'{{execution_date.strftime('%Y-%m-%d/%H:%M')}}' \
file.json
""",
dag=dag)
Воздушный поток 1.10 также добавил next_execution_date
, что может работать лучше, чем добавление дня к execution_date
, если ваш интервал ежедневный.