Другие сообщения по этому / похожему вопросу не помогли мне.Так что я боюсь, что я упускаю из виду что-то очевидное, но здесь идет.В нескольких группах DAG Airflow в моей настройке используется BranchPythonOperator, один из которых никогда не выполняет определенную ветку.Вот соответствующий код:
def check_transport():
date = datetime.today().day;
if date == 15 or date == 16:
return 'skip_transport'
else:
return 'transport_data'
transport_data = BashOperator(
task_id = 'transport_data',
bash_command = '**Bash command to transport data is here and correct**',
dag = dag)
skip_transport = DummyOperator(
task_id = 'skip_transport',
dag = dag)
transport_check = BranchPythonOperator(
task_id = 'transport_check',
python_callable = check_transport,
dag = dag)
transport_check.set_downstream(transport_data)
transport_check.set_downstream(skip_transport)
Древовидное представление DAG показывает 01/15, что оно правильно следовало за веткой skip_transport, помечая его как успешное и помечая transport_data как пропущенное.В другие дни skip_transport показывается как пропущенный, а transport_data - белый (без статуса), и эта ветвь не выполняется.
Журнал проверки_транспорта показывает, что он намерен выполнить правильные действия:
[2019-01-30 07:44:17,786] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,786] {models.py:1342} INFO - Executing <Task(BranchPythonOperator): transport_check> on 2019-01-29 03:30:00
[2019-01-30 07:44:17,835] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,835] {python_operator.py:81} INFO - Done. Returned value was: transport_data
[2019-01-30 07:44:17,835] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,835] {python_operator.py:106} INFO - Following branch transport_data
[2019-01-30 07:44:17,836] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,835] {python_operator.py:107} INFO - Marking other directly downstream tasks as skipped
[2019-01-30 07:44:17,861] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,861] {python_operator.py:119} INFO - Done.
Я попытался добавить trigger_rule = "all_done"
в PythonBranchOperator без изменений в поведении.