Есть ли способ ежедневно обновлять dag воздушного потока на основе кода определения файла dagfile? Например. обновить значения даты, которые могут использоваться в определении dag.
Для контекста: у меня есть dag воздушного потока, который ежедневно получает новые строки таблицы из удаленной базы данных и перемещает их в локальную базу данных. Для того, чтобы получить самые последние строки из удаленного, у нас есть функция, которая получает последнюю дату из локального. В настоящее время даг определен как ...
...
def get_latest_date(tablename):
# get latest import date from local table
....
for table in tables: # type list(dict)
task_1 = BashOperator(
task_id='task_1_%s' % table["tablename"],
bash_command='bash %s/task_1.sh %s' % (PROJECT_HOME, table["latest_date"]),
execution_timeout=timedelta(minutes=30),
dag=dag)
task_2 = BashOperator(
task_id='task_2_%s' % table["tablename"],
bash_command='bash %s/task_2.sh' % PROJECT_HOME,
execution_timeout=timedelta(minutes=30),
dag=dag)
task_1 >> task_2
, где таблицы - это дикты, где одно из их полей построено ранее в коде, чтобы быть строковым представлением самой последней даты для данной таблицы. При печати предполагаемой последней даты в сценарии task_1.sh, обнаруживается, что дата не обновляется каждый день. Нужен способ, чтобы список таблиц создавался заново каждый день, чтобы иметь правильные значения даты.