Как запустить subdags с другим интервалом расписания? - PullRequest
1 голос
/ 09 июля 2019

У меня есть DAG Airflow, которая состоит из следующих задач:

  1. добавление CSV в промежуточную таблицу (t1)
  2. удалить старую запись из основной таблицы (t2)
  3. добавить последние данные в таблицу (t3)

и один подзаголовок, который запускается каждый день в конце дня (23.59 или 23.59 в 24-часовом формате). Сначала должны сработать первые три задачи, затем будут запущены поддаги

t1 >> t2 >> t3 >> subdag

Проблема в том, что первые 3 задачи работают хорошо, но не подзадача. Сначала я перезагружаюсь, чем выдаю флаг ошибки. Я также не могу проверить, где и почему происходит ошибка.

Я попытался переопределить schedule_interval из моего подпадала, чтобы получить ожидаемый результат, с 12 * * * * до 59 12 * * *. Я тоже пытаюсь из этого поста в блоге тоже https://medium.com/handy-tech/airflow-tips-tricks-and-pitfalls-9ba53fba14eb:

Это код dag default_dag_args:

DAG_NAME = 'order_bid'

...
default_dag_args = {
    'start_date': start_date,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'project_id': models.Variable.get('XXXXXXXXXXX')
}

Это пример объявления задачи:

task_add_order_bid = bigquery_operator.BigQueryOperator(
    task_id='add_order_bid',
    bql=order_bid.sql_itop_order_bid.format(
        table_order_bid_stg=table_order_bid_stg,
        date_from=date_from.strftime("%Y-%m-%d")
        ),
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    destination_dataset_table=table_order_bid,
    dag=dag,

)

Это поддаг, который я пытаюсь объявить:

subdag_daily_bid = SubDagOperator(
    subdag=daily.dailyBidding(
        DAG_NAME,
        "daily_order_bid",
        start_date,
        dt_wib),
    task_id="daily_order_bid",
dag = dag)

Но когда я хочу изменить расписание, я объявляю свой поддаг следующим образом:

def dailyBidding(parent_dag, child_dag, start_date, task_date):
    dag = models.DAG(
        '%s.%s' % (parent_dag, child_dag),
        schedule_interval='59 12 * * *',
        start_date=start_date
        )

    date_from = task_date - timedelta(days=1)

    task_del_taxi_order_bid_daily = bigquery_operator.BigQueryOperator(
    task_id='del_daily_order_bid',
    bql=sql_del_partition_order_bid_daily.format(
        table_order_bid=table_order_bid_master,
        date_from=date_from.strftime("%Y-%m-%d")),
    use_legacy_sql=False,
    dag=subdag)

task_add_daily_order_bid = bigquery_operator.BigQueryOperator(
    task_id='add_daily_order_bid',
    bql=daily.sql_add_daily.format(
        source = table_order_bid_master,
        yesterday = date_from.strftime("%Y-%m-%d"),
        monthly = a_month_ago.strftime("%Y-%m-%d")),
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED',
    destination_dataset_table=table_daily_order_bid,
    dag=subdag)


task_del_taxi_order_bid_daily >> task_add_daily_order_bid

    return dag

Я ожидаю, что мой dag работает в 12.59, но он все еще ждет, чтобы бежать и следовать расписанию от родителей.

...