Как использовать While Loop для выполнения оператора Airflow - PullRequest
0 голосов
/ 31 августа 2018

Требование: запускать SQL-запрос для каждой даты, используя цикл while. Например: дата начала выбрана 25 августа, а дата окончания 28 августа. Затем BigQueryOperator сначала запускается 25 августа, затем 26 августа и так далее, пока не достигнем 28 августа.

Проблема: в DAG ниже он только выполняет запрос на дату начала, а затем завершает работу. Он даже не выполняет / итерацию BigQueryOperator до следующей даты и так далее.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import date, datetime, timedelta
import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime.datetime(2018, 8, 31),
    'email': ['xyz@xyz.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=10),
    'depends_on_past': False
}

dag = DAG('his_temp',default_args=default_args,schedule_interval=None)

date1 = datetime.date(2018, 8, 25)
date2 = datetime.date(2018, 8, 28)
day = datetime.timedelta(days=1)

while date1 <= date2:
    parameter = {
        'dataset': "projectname.finance",
        'historical_date': date1.strftime('%Y%m%d')
    }


    sqlpartition = BigQueryOperator(
    task_id='execute_sqlpartition',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql="sqlqueries/sqlpartition.sql",
    destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'),
    params=parameter,
    bigquery_conn_id='bigquery',
    dag=dag)

    print "data loaded for "+ parameter.get('historical_date')

    date1 = date1 + day   

1 Ответ

0 голосов
/ 01 сентября 2018

Вся концепция планировщика воздушного потока заключается в том, что он будет планировать задачи, и вам просто нужно правильно его настроить. Неудивительно, что он запускается один раз, так как будет указана дата начала, указанная в качестве даты начала dag, а поскольку ежедневное задание не запланировано, оно будет выполнено один раз и остановится. Вы должны настроить на уровне dag, а не на уровне оператора.

Обратитесь к документации для правильной синхронизации и интервал планирования Планировщик документов

...