Возможно ли, чтобы DAG обнаружил первый запуск в определенную дату в Airflow? - PullRequest
0 голосов
/ 16 января 2019

У меня DAG работает каждые 30 минут.

Скажите, что это DAG (для простоты используются фиктивные операторы):

dag = DAG(
    dag_id='My_dag',
    default_args=args,
    schedule_interval=timedelta(minutes=30),
    max_active_runs=1,
    catchup=False,
)
start = DummyOperator(task_id='start_task', dag=dag)
to_do = DummyOperator(task_id='to_do_task ', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)

start >> to_do >> end

Теперь, один раз в день я хочу добавить другого оператора в рабочий процесс, который будет выполняться только при первом запуске в этот день .

Скажите, что это:

once = DummyOperator(task_id='once_task ', dag=dag)
start >> once

означает, что once должен выполняться один раз каждые 24 часа, а остальные должны быть пропущены.

Я не могу сделать это с PythonBranchOperator, как я не могу что-то вроде:

if execution_date == midnigt

потому что я не знаю, когда произойдет первое казнь. Это может быть 00:01, и это может быть 00:17 и т. Д.

Есть ли способ проверить, является ли это первым запуском за дату выполнения? Я звучу как TimeSensor, но я не мог найти, как это сделать с документами. Можно ли совать тот же DAG?

1 Ответ

0 голосов
/ 16 января 2019

Вы можете проверить предыдущую дату выполнения (макрос prev_ds) и сравнить ее с текущей датой выполнения (макрос ds) в BranchPythonOperator.Пример:

start = DummyOperator(task_id='start_task', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
once = DummyOperator(task_id='once_task', dag=dag)
dummy_task_id_that_does_nothing = DummyOperator(task_id='dummy_task_id_that_does_nothing', dag=dag)

def check_if_task_already_ran(**context):
    ds = context.get('ds')
    prev_ds = context.get('prev_ds')

    pprint(context)
    print(ds)
    print(prev_ds)

    if prev_ds == ds:
        return dummy_task_id_that_does_nothing
    else:
        return once_task    # Task that would just be executed once in a day


compare_ds = BranchPythonOperator(
    task_id='compare_ds',
    provide_context=True,
    python_callable=check_if_task_already_ran,
    dag=dag)


start >> compare_ds
compare_ds >> once >> end
compare_ds >> dummy_task_id_that_does_nothing >> end
...