У меня есть график воздушного потока с условной ветвью, определенной как
class BranchFlags(Enum):
yes = "yes"
no = "no"
...
for table in list_of_tables # type list(dict)
task_1 = BashOperator(
task_id='task_1_%s' % table["conf1"],
bash_command='bash script1.sh %s' % table["conf1"],
dag=dag)
if table["branch_flag"] == BranchFlags.yes:
consolidate = BashOperator(
task_id='task_3_%s' % table["conf2"],
bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
dag=dag)
task_3 = BashOperator(
task_id='task_3_%s' % table["conf3"],
bash_command='bash script3.sh %s' % table["conf3"],
dag=dag)
task_1 >> task_3
if table["branch_flag"] == BranchFlags.yes:
task_1 >> task_2
, и вот график в пользовательском интерфейсе воздушного потока из моего фактического кода:
Обратите внимание, что, хотя более длинные части графика работают нормально, одиночная ветвь не выполняется для одной последовательности, которая должна была ветвиться. При просмотре журналов для задачи я вижу
*** Экземпляр задачи не существует в БД
Это странно для меня, так как якобы планировщик БДвидит задачу, поскольку она появляется на графике веб-интерфейса. Не уверен, что здесь происходит, и добавление других изменений в файл dag .py
отображается на графике и выполняется планировщиком при запуске графика. И при попытке просмотра задач Сведения об экземпляре задачи выдает ошибку
Задача [dagname.task_3_qwerty], по-видимому, не существует в данный момент
Выполняется airflow resetdb
(какЯ видел в других сообщениях) ничего не делает для этой проблемы.
Обратите внимание, что предполагается, что короткая ветвь работает одновременно с более длинной ветвью (не как ни то, ни другое).
Кто-нибудь знает, почему это происходит, или есть советы по отладке?