параметр командной строки воздушного потока для запуска adhoc? - PullRequest
0 голосов
/ 01 июля 2019

Мы только начали включать поток воздуха для планирования. Один из моих сценариев запускается ежедневно. Он использует параметр шаблона ({{ds_nodash}}) для получения дат. Но я должен перезапустить для однодневной загрузки (дата прошлого), как я могу предоставить входной параметр. Входной параметр переопределит ds_nodash.

I have :
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} " 

Would like to run for 
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade **20190601** " 

Фрагмент кода ниже:

import os
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 19),
    'email': ['sanjeeb@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('create-data-set-job', default_args=default_args)
projct_dr='/home/airflow/projects/'

trade_acx_ld="/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh" 
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} " 


t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

if os.path.exists(trade_acx_ld):
   t2 = BashOperator(
        task_id= 'Dataset_create',
        bash_command=trade_acx_ld_cmd,
        dag=dag
   )
else:
    raise Exception("Cannot locate {0}".format(trade_acx_ld_cmd))

t2.set_upstream(t1)

1 Ответ

0 голосов
/ 01 июля 2019

Вы можете просто вызвать DAG вручную, используя airflow trigger_dag.{{ ds_nodash }} займет execution_date, поэтому, если вы активируете группу обеспечения доступности баз данных со старой датой выполнения, {{ ds_nodash }} будет использовать более старую execution_date вместо сегодняшней даты.

Вы можете передать execution_date вtrigger_dag команда следующим образом.

airflow trigger_dag gcs-file-load-job -e "2019-01-01"
...