Airflow BranchPythonOperator не выполняет одну ветку - PullRequest
0 голосов
/ 30 января 2019

Другие сообщения по этому / похожему вопросу не помогли мне.Так что я боюсь, что я упускаю из виду что-то очевидное, но здесь идет.В нескольких группах DAG Airflow в моей настройке используется BranchPythonOperator, один из которых никогда не выполняет определенную ветку.Вот соответствующий код:

def check_transport():
   date = datetime.today().day;
   if date == 15 or date == 16:
       return 'skip_transport'
   else:
       return 'transport_data'

transport_data = BashOperator(
    task_id = 'transport_data',
    bash_command = '**Bash command to transport data is here and correct**',
    dag = dag)

skip_transport = DummyOperator(
    task_id = 'skip_transport',
    dag = dag)

transport_check = BranchPythonOperator(
    task_id = 'transport_check',
    python_callable = check_transport,
    dag = dag)
transport_check.set_downstream(transport_data)
transport_check.set_downstream(skip_transport)

Древовидное представление DAG показывает 01/15, что оно правильно следовало за веткой skip_transport, помечая его как успешное и помечая transport_data как пропущенное.В другие дни skip_transport показывается как пропущенный, а transport_data - белый (без статуса), и эта ветвь не выполняется.

Журнал проверки_транспорта показывает, что он намерен выполнить правильные действия:

[2019-01-30 07:44:17,786] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,786] {models.py:1342} INFO - Executing <Task(BranchPythonOperator): transport_check> on 2019-01-29 03:30:00
[2019-01-30 07:44:17,835] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,835] {python_operator.py:81} INFO - Done. Returned value was: transport_data
[2019-01-30 07:44:17,835] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,835] {python_operator.py:106} INFO - Following branch transport_data
[2019-01-30 07:44:17,836] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,835] {python_operator.py:107} INFO - Marking other directly downstream tasks as skipped
[2019-01-30 07:44:17,861] {base_task_runner.py:95} INFO - Subtask: [2019-01-30 07:44:17,861] {python_operator.py:119} INFO - Done.

Я попытался добавить trigger_rule = "all_done" в PythonBranchOperator без изменений в поведении.

...