Запустить задачу, когда другой закончил с успехом - PullRequest
0 голосов
/ 19 июня 2019

Мой пример использования следующий:

  • Task A генерирует dataset с использованием некоторых исходных необработанных данных
  • Task B выполняет некоторый код, используя dataset в качестве ввода
  • Task C выполняет какой-то другой код, используя dataset в качестве ввода

Три задачи запланированы для ежедневного выполнения, а Task B и Task C запланированы для запуска достаточное время после Task A, и они просто не выполняются, если входной набор данных не был сгенерирован для некоторых причина.

В качестве первого улучшения я добавил ExternalTaskSensor в Task B и Task C, но это просто позволяет избежать их запуска, если Task A еще не завершен или не выполнен.

Тем не менее, ExternalTaskSensor, кажется, не работает нормально с обратной засыпкой (это довольно хрупко, поскольку зависит только от дата выполнения , плюс, если Task A будет запущен снова, Task B и Task C не узнают).

Решение 1 (не применимо): Я видел вопрос этого SO: В потоке воздуха, есть ли хороший способ вызвать задачу другого дага?

Это не идеально для меня, потому что я хотел бы оставить Task A в неведении зависимых задач и обрабатывать логику в Task B и Task C (или внешне). Причина в том, что в будущем будут добавлены другие задачи, использующие вывод Task A (из разных групп организации), и нежелательно каждый раз обновлять Task A.

Резюме Я хотел бы вызвать Task B и Task C, если и только если Task A был выполнен с успехом (независимо, если он был запланирован или запущен вручную), без изменения Task A для достижения этого.

1 Ответ

0 голосов
/ 20 июня 2019

В соответствии с вашим сценарием единственное понятие, которое я могу придумать, это SubDags.(См. ПРЕДУПРЕЖДЕНИЕ перед внедрением)

SubDagOperator позволяет вам прикрепить набор задач к вашему task A.См. Приведенный ниже код.

dag = DAG('parent_dag', description='Parent',
          schedule_interval='@daily',
          start_date=datetime.now())

task_a = DummyOperator(dag=dag, task_id='task_a')

subdag_task = SubDag(task_id='load_tasks',
    subdag=load_subdag('parent_dag', 'dependent_tasks'),
    dag=dag)

task_a >> subdag_task

Теперь в отдельном файле вы определяете свою функцию load_subdag.

def load_subdag(parent_dag_name, child_dag_name):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name),
        schedule_interval="@daily",
    )
    with dag_subdag:
        task_b = DummyOperator(
            task_id='load_subdag_task_b',
            dag=dag_subdag)

       task_c = DummyOperator(
            task_id='load_subdag_task_c',
            dag=dag_subdag)

    return dag_subdag

ПРЕДУПРЕЖДЕНИЕ (красным и жирным): задачи SubDag занимают слоты в вашемрабочий, как личинки.Пожалуйста, поймите предостережения полностью, прежде чем перейти к этому. AIRFLOW-74 дает представление о том, насколько это может быть плохо.Многие разработчики по той же причине отвергают его.

...