Хочу установить дату выполнения в триггере DAG.Я использую оператор TriggerDagRunOperator, у этого оператора есть параметр execute_date, я хочу установить текущую дату исполнения.
def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
pp = pprint.PrettyPrinter(indent=4)
c_p = Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1"
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1":
pp.pprint(dag_run_obj.payload)
return dag_run_obj
default_args = {
'owner': 'pepito',
'depends_on_past': False,
'retries': 2,
'start_date': datetime(2018, 12, 1, 0, 0),
'email': ['xxxx@yyyyy.net'],
'email_on_failure': False,
'email_on_retry': False,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'DAG_1',
default_args=default_args,
schedule_interval="0 12 * * 1",
dagrun_timeout=timedelta(hours=22),
max_active_runs=1,
catchup=False
)
trigger_dag_2 = TriggerDagRunOperator(
task_id='trigger_dag_2',
trigger_dag_id="DAG_2",
python_callable=conditionally_trigger,
execution_date={{ execution_date }},
dag=dag,
pool='a_roz'
)
Но я получаю следующую ошибку
name 'execute_date'не определено
Если я установлю
execution_date={{ 'execution_date' }},
или
execution_date='{{ execution_date }}',
Я получу
Traceback (самая последняяпоследний вызов):
Файл "/usr/local/lib/python3.6/site-packages/airflow/models.py", строка 1659, в _run_raw_task
result = task_copy.execute(context = context)
Файл "/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", строка 78, в файле execute
replace_microseconds =Неверно)
Файл "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", строка 98, в trigger_dag
replace_microseconds= replace_microseconds,
Файл "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", строка 45, в _trigger_dag
assert timezone.is_localized (execute_date)
Файл "/usr/local/lib/python3.6/site-packages/airflow/utils/timezone.py", строка 38, вis_localized
return value.utcoffset () не является None
AttributeError: у объекта 'str' нет атрибута 'utcoffset'
Кто-нибудь знает, как я могу установитьдата выполнения для DAG_2, если я хочу быть равной DAG_1?
Этот вопрос отличается от воздушного потока TriggerDagRunOperator, как изменить дату выполнения , т.к. в этом посте не объясняется, как отправлятьexecute_date через оператор TriggerDagRunOperator, в нем только сказано, что такая возможность существует.https://stackoverflow.com/a/49442868/10269204