Вы можете сделать это с помощью макроса execution_date
.Однако следует помнить, что это плохо названная концепция в воздушном потоке.Это представляет начало запланированного интервала периода.Он не изменится в том же режиме, даже если задача будет перезапущена вручную.Он там для поддержки идемпотентных обновлений данных.Что, честно говоря, является оптимальным способом приблизиться к конвейерам данных.В вашем случае, хотя вы и сказали в другом месте, что ваш api для извлечения данных берет дату начала и предоставляет все данные до текущей даты, что не способствует идемпотентной обработке, хотя вы можете выбросить данные после указанного отрезания.
Таким образом, вместо этого вы можете просто взять дату после завершения обработки данных и сохранить ее на потом.Вы можете сохранить в переменной воздушного потока.Вы можете заметить, однако, что время, которое вы получите из команды date, показанной ниже, будет позже, чем в последний раз, когда данные, которые вы, возможно, получили из вашего api-вызова process_data для всех данных с даты начала.Поэтому может быть лучше, если ваш шаг обработки выводит фактическую последнюю дату и время данных, обработанных в качестве последней строки стандартного вывода (который захватывается BashOperator для xcom).
EG
from airflow.models import Variable, DAG
from datetime import datetime
def pyop_fun(**context):
# You could have used execution_date here and in the next operator
# to make the operator rerun safe.
# date_string = context['execution_date'].strftime('%Y-%m-%d %H:%M')
# But elsewhere you said your api is always giving you the up-to-the-minute data.
# So maybe getting the date from the prior task would work better for you.
Variable.set(
'process_last_run',
context['task_instance'].xcom_pull(task_ids='process_data')
with dag as DAG(…):
pyop = PythonOperator(
task_id='set_process_last_run',
callable=pyop_fun,
provide_context=True, …)
shop = BashOperator(
task_id='process_data',
bash_command='''
process_data "{{var.value.process_last_run}}";
date -u +%Y-%m-%d\ %H:%M''',
xcom_push=True, …)
shop >> pyop
# Because the last output line of a BashOperator is pushed into xcom for that
# task id with the default key, it can be pulled by the PythonOperator and
# stored in a variable.