Задача воздушного потока не перемещается на зависимость, но повторно запускает задачу - PullRequest
0 голосов
/ 04 июня 2018

У меня есть Airflow Workflow, который состоит из трех задач;при этом вторая задача зависит от первой, а третья задача зависит от второй.

Если я запускаю группу обеспечения доступности баз данных через веб-сервер, первая задача завершается, но затем начинает перезапускаться вместо запуска второй задачи.Следует иметь в виду, что выполнение первой задачи занимает более 130 секунд.Это происходит из-за продолжительности первого задания?

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime


default_args = {
    'owner': 'David',
    'depends_on_past': True,
    'start_date': datetime(2018,5,18),
    'email': ['email_address'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'DCM_Floodlight_Report_API',
    default_args=default_args,
    description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
    schedule_interval='30 14 * * *')

t1 = BashOperator(
    task_id='Pull_DCM_Report',
    bash_command='python "/Users/run_report.py" 2737542 134267867', dag=dag)

t2 = BashOperator(
    task_id='Cleanse_File',
    bash_command='python "/Users/cleanse_file.py"',dag=dag)

t3 = BashOperator(
    task_id='S3_Bucket_Creation_Upload_File',
    bash_command='python "/Users/aws_s3_creation&load.py"',dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t2)

Ответы [ 2 ]

0 голосов
/ 04 июня 2018

Попробуйте без логики повтора и посмотрите, как это работает.Используйте эти аргументы по умолчанию и информацию о тегах:

`default_args = {
'owner': 'David',
'depends_on_past': False,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True
}

dag = DAG(
 dag_id='DCM_Floodlight_Report_API',
 default_args=default_args,
 catchup=False,
 description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
 schedule_interval='30 14 * * *')

Я добавил catchup, установил значение False и изменил depends_on_past на False.Я также удалил логику повторов.Это может решить вашу проблему - пожалуйста, дайте мне знать!

0 голосов
/ 04 июня 2018

Я не думаю, что время выполнения вашей задачи является проблемой.- такое поведение наиболее вероятно обусловлено параметром catchup, который по умолчанию равен True.

https://airflow.apache.org/scheduler.html#backfill-and-catchup

Это означает, что Airflow планирует первое задание для каждого интервала расписания междуваш start_date и текущее время.Вы можете просмотреть свой Tree View в пользовательском интерфейсе, чтобы увидеть, запланировано ли более одного DagRun.Если вы просто тестируете свою группу обеспечения доступности баз данных, я бы порекомендовал установить для schedule_interval значение @once, прежде чем планировать его запуск на даты в прошлом или будущем.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...