Требование: запускать SQL-запрос для каждой даты, используя цикл while. Например: дата начала выбрана 25 августа, а дата окончания 28 августа. Затем BigQueryOperator сначала запускается 25 августа, затем 26 августа и так далее, пока не достигнем 28 августа.
Проблема: в DAG ниже он только выполняет запрос на дату начала, а затем завершает работу. Он даже не выполняет / итерацию BigQueryOperator до следующей даты и так далее.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import date, datetime, timedelta
import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime.datetime(2018, 8, 31),
'email': ['xyz@xyz.com'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=10),
'depends_on_past': False
}
dag = DAG('his_temp',default_args=default_args,schedule_interval=None)
date1 = datetime.date(2018, 8, 25)
date2 = datetime.date(2018, 8, 28)
day = datetime.timedelta(days=1)
while date1 <= date2:
parameter = {
'dataset': "projectname.finance",
'historical_date': date1.strftime('%Y%m%d')
}
sqlpartition = BigQueryOperator(
task_id='execute_sqlpartition',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql="sqlqueries/sqlpartition.sql",
destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'),
params=parameter,
bigquery_conn_id='bigquery',
dag=dag)
print "data loaded for "+ parameter.get('historical_date')
date1 = date1 + day