Apache Airflow - Как установить execute_date с использованием TriggerDagRunOperator в целевой группе доступности базы данных для использования текущей даты execute_date - PullRequest
0 голосов
/ 14 декабря 2018

Хочу установить дату выполнения в триггере DAG.Я использую оператор TriggerDagRunOperator, у этого оператора есть параметр execute_date, я хочу установить текущую дату исполнения.

def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    pp = pprint.PrettyPrinter(indent=4)
    c_p = Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1"
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1":
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj

default_args = {
    'owner': 'pepito',
    'depends_on_past': False,
    'retries': 2,
    'start_date': datetime(2018, 12, 1, 0, 0),
    'email': ['xxxx@yyyyy.net'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    'DAG_1',
    default_args=default_args,
    schedule_interval="0 12 * * 1",
    dagrun_timeout=timedelta(hours=22),
    max_active_runs=1,
    catchup=False
)

trigger_dag_2 = TriggerDagRunOperator(
    task_id='trigger_dag_2',
    trigger_dag_id="DAG_2",
   python_callable=conditionally_trigger,
    execution_date={{ execution_date }},
   dag=dag,
   pool='a_roz'
)

Но я получаю следующую ошибку

name 'execute_date'не определено

Если я установлю

execution_date={{ 'execution_date' }},

или

execution_date='{{ execution_date }}',

Я получу

Traceback (самая последняяпоследний вызов):

Файл "/usr/local/lib/python3.6/site-packages/airflow/models.py", строка 1659, в _run_raw_task

result = task_copy.execute(context = context)

Файл "/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", строка 78, в файле execute

replace_microseconds =Неверно)

Файл "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", строка 98, в trigger_dag

replace_microseconds= replace_microseconds,

Файл "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", строка 45, в _trigger_dag

assert timezone.is_localized (execute_date)

Файл "/usr/local/lib/python3.6/site-packages/airflow/utils/timezone.py", строка 38, вis_localized

return value.utcoffset () не является None

AttributeError: у объекта 'str' нет атрибута 'utcoffset'

Кто-нибудь знает, как я могу установитьдата выполнения для DAG_2, если я хочу быть равной DAG_1?

Этот вопрос отличается от воздушного потока TriggerDagRunOperator, как изменить дату выполнения , т.к. в этом посте не объясняется, как отправлятьexecute_date через оператор TriggerDagRunOperator, в нем только сказано, что такая возможность существует.https://stackoverflow.com/a/49442868/10269204

Ответы [ 2 ]

0 голосов
/ 27 марта 2019

вы можете попробовать

from datetime import datetime, timezone
execution_date=datetime(2019, 3, 27, tzinfo=timezone.utc)
execution_date=datetime.now().replace(tzinfo=timezone.utc)

, кроме того, он уже настроен, но проверьте версию, которую вы используете commit

0 голосов
/ 15 декабря 2018

Вы получаете эту ошибку, потому что execution_date в TriggerDagRunOperator не является шаблонным полем.Он ожидает объект datetime.

Пример:

import datetime
execution_date = datetime.datetime.now()

Затем вы можете использовать эту переменную для передачи execution_date в TriggerDagRunOperator

...