Как добиться более сложного планирования DAG с потоком воздуха? - PullRequest
0 голосов
/ 15 апреля 2019

Воздушный поток позволяет легко выполнять задания с фиксированными интервалами. В этом посте содержится совет о том, как справиться с более сложным требованием планирования.

Например, предположим, у меня есть процесс, который извлекает файлы с SFTP-сервера и что-то с ними делает. Источник только публикует файлы M-F. Я хочу, чтобы даг вел себя следующим образом:

  • пробег только M-F;
  • по понедельникам, искать файлы из execution_date - 0 и - 1 и - 2
  • вт-пт, просто поищите execution date - 0?

Кажется, это не практично для реализации, и мне нужно просто разработать его так, чтобы он извлекал все файлы, которые там были, и запускался каждый день, без ссылки на конкретные файлы.

Дело в том, что, если я могу указать файлы, управляемые execution_date, тогда я смогу точно видеть, что было извлечено и не получено, и использовать функцию повтора.

Один способ, который приходит на ум, - это создать 7 дагов с еженедельным расписанием. Но мне не нравится эта идея.

Другой случай был бы, если бы я хотел, чтобы процесс запускался каждое второе воскресенье месяца. Есть ли способ сделать такое?

EDIT: Я думаю, что самым чистым способом для достижения этой цели будет проект dag, который будет всегда извлекать файлы с датой execution_date, но только чтобы не запускать спутниковую и солнечную системы до понедельника (и использовать для этого оператор триггера dag), и использовать контроллер Dag с BranchOperator и TriggerDagOperator для достижения этой цели.

Ответы [ 2 ]

2 голосов
/ 15 апреля 2019

Установите для группы DAG 'schedule_interval': значение '0 0 * * 1-5' в 00:00 каждого дня недели с понедельника по пятницу.Отрегулируйте время по мере необходимости (первые два нуля).

Далее, используйте BranchPythonOperator как способ ввода DAG.По понедельникам DAG выполняется и ищет файл с execution_date - 0, execution_date - 1 и execution_date - 2.Со вторника по пятницу, он просто ищет execution_date - 0.

Я создал быстрый пример, чтобы показать вам, что я имею в виду.Я надеюсь, что это достаточный пример.Дайте мне знать, если я смогу помочь.

#Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
# General imports
from datetime import datetime

DAG_ID = 'stackoverflow_exampledag'

args = {
    'owner': 'you',
    'email': ['you@yourwork.com'],
    'depends_on_past': False,
    'email_on_retry': False,
    'email_on_failure': True,
    'start_date': datetime(2019, 4, 14)
}

dag = DAG(
    dag_id=DAG_ID,
    default_args=args,
    schedule_interval="0 0 * * 1-5"
    )


#################################
######## Python Script ##########
#################################


def checktheday(**kwargs):
    weekday = datetime.today().weekday()
    if weekday == 1:
        return 'monday_only_task'
    else:
        return 'tuesday_through_friday_task'


####################################
########## TASKS ###################
####################################

# BranchPythonOperator is the entry point for this DAG.
# The python callable will return the task id of the appriorate subdag/task that it's supposed to run.

checktheday_task = BranchPythonOperator(
    task_id='checktheday_task',
    python_callable=checktheday,
    dag=dag,
    provide_context=True
    )

monday_only_task = DummyOperator(
    task_id='monday_only_task',
    dag=dag
    )

tuesday_through_friday_task = DummyOperator(
    task_id='tuesday_through_friday_task',
    dag=dag


#################################
########## ORCHESTRATION ########
#################################
monday_only_task.set_upstream(checktheday_task)
tuesday_through_friday_task.set_upstream(checktheday_task)

example graph view

0 голосов
/ 17 апреля 2019

Ответ Зака ​​был полезен в решении этой проблемы (поскольку оператор ветвления был необходим), но решение, с которым я собираюсь, заключается в использовании TriggerDagRunOperator.

Вот даги, которые я построил для проверки этого подхода.

Целевой перевод

def alert(ti, **kwargs):
    message = f"Execution date is {ti.execution_date}"
    print(message)

with target_dag:
    PythonOperator(
        python_callable=alert,
        task_id='target_task',
        provide_context=True,
    )

Запуск триггера

def check_day(ti, **kwargs):
    execution_date = ti.execution_date
    if execution_date.minute % 7 == 0:
        return ['weekday_trigger', 'saturday_trigger', 'sunday_trigger']
    elif execution_date.minute % 7 in range(1, 5):
        return ['weekday_trigger']
    else:
        return []

with trigger_dag:
    check_day_task = BranchPythonOperator(
        task_id='check_day_task',
        python_callable=check_day,
        provide_context=True,
    )

    weekday_trigger = TriggerDagRunOperator(
        task_id='weekday_trigger',
        trigger_dag_id='target_dag',
        execution_date='{{ execution_date }}'
    )
    saturday_trigger = TriggerDagRunOperator(
        task_id='saturday_trigger',
        trigger_dag_id='target_dag',
        execution_date='{{ execution_date + macros.timedelta(days=-1) }}'
    )
    sunday_trigger = TriggerDagRunOperator(
        task_id='sunday_trigger',
        trigger_dag_id='target_dag',
        execution_date='{{ execution_date + macros.timedelta(days=-2) }}'
    )

    check_day_task >> [weekday_trigger, saturday_trigger, sunday_trigger]

Почему бы просто не использовать оператор ветвления?

Причина, по которой я предпочитаю этот подход, заключается в том, что мой целевой даг не должен заботиться о сложном планировании.Все, что нужно заботиться, это дата исполнения.Бывает, что по понедельникам мы хотим выполнить execution_date - 1 и execution_date - 2 в дополнение к execution_date.Но целевой dag работает одинаково, несмотря ни на что: он делает определенную вещь, основываясь на execution_date.

Если я попытаюсь включить оператор перехода в целевой dag, он очень быстро запутается.Например, если ваш целевой даг имеет 4 задания, вам нужно дублировать эти 2 раза по понедельникам.Кроме того, древовидное представление истории запусков dag было бы уродливым, с большим количеством пропущенных задач и обратной засыпкой, вероятно, было бы странно.В понедельник по будням наш триггер dag вызывает target_dag с той же датой выполнения, что и триггер dag.На выходных спусковой крючок не срабатываетИ в понедельник, он выполняет 3 прогона target_dag, для понедельника и предыдущих двух дней.

Примечание. Я использовал минуты для имитации дней при тестировании запланированных прогонов.

Вот график с триггером: dag: graph view of trigger dag

Деревовид основной задачи остается чистым и простым: tree view of target dag

...