Перезапустите Dag Airflow Dag From Middle Task и продолжайте его до конца всех последующих задач (Возобновите Airflow DAG из любой задачи). - PullRequest
0 голосов
/ 09 октября 2018

Привет, я новичок в Apache Airflow, у меня есть зависимость: скажем,

Задача A >> Задача B >> Задача C >> Задача D >> Задача E

  1. Можно ли запустить Airflow DAG из средней задачи, скажем, из задачи C?

  2. Можно ли запустить только определенную ветвь в случае оператора ветвления в середине?

  3. Можно ли возобновить DAG Airflow с последней задачи сбоя?

  4. Если невозможно, как управлять большими DAG и избежать повторного запуска избыточных задач?

Пожалуйста, дайте мне предложения о том, как реализовать это, если это возможно.

1 Ответ

0 голосов
/ 10 октября 2018
  1. Вы не можете сделать это вручную.Если вы установили BranchPythonOperator, вы можете пропускать задачи до тех пор, пока задача не будет запущена, в соответствии с условиями, установленными в BranchPythonOperator

  2. То же, что 1.

  3. да.Вы можете очистить задачи вверх по течению до корня или вниз по течению до всех выходов из узла.

Вы можете сделать что-то вроде:

Task A >> Task B >> Task C >> Task D
Task C >> Task E

Где C - этооператор филиалаНапример:

    from datetime import date
    def branch_func():
        if date.today().weekday() == 0:
            return 'task id of D'
        else:
            return 'task id of E'


    Task_C = BranchPythonOperator(
        task_id='branch_operation',
        python_callable=branch_func,
        dag=dag)

Это будет последовательность задач в понедельник:

Task A >> Task B >> Task C >> Task D

Это будет последовательность задач в оставшуюся часть недели:

Task A >> Task B >> Task C >> Task E
...