Как передать дату выполнения DAG задачам? - PullRequest
1 голос
/ 17 апреля 2020

Я новичок в области воздушных потоков и пытаюсь выяснить, как передать дату выполнения группы DAG для каждой задачи. У меня есть это в моей группе DAG:

tzinfo=tz.gettz('America/Los_Angeles')
dag_run_date = datetime.now(_tzinfo)

dag = DAG(
    'myDag', 
    default_args=default_args,
    schedule_interval = None,
    params = {
        "runDateTimeTz" : dag_run_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
    }
)

Затем я пытаюсь передать параметр runDateTimeTz в каждая из моих задач, что-то вроде этого ..

task1 = GKEPodOperator(
    image='gcr.io/myJar:1.0.1.45',           
    cmds=['java'],
    arguments=["-jar","myJar.jar", {{params.runDateTimeTz}}"],
    dag=dag)

task2 = GKEPodOperator(
    image='gcr.io/myJar2:1.0.1.45',           
    cmds=['java'],
    arguments=["-jar","myJar2.jar", {{params.runDateTimeTz}}"],
    dag=dag)

Мои задачи выполняются правильно, но я ожидал, что все они получат одинаковую дату выполнения в params.runDateTimeTz, но этого не произошло, для пример task1 получает params.runDateTimeTz=2020-04-16T07:42:47.412716-07:00, а task2 получает params.runDateTimeTz= 2020-04-16T07:43:29.913289-07:00

Я полагаю, это поведение связано с тем, как воздушный поток заполняет params для DAG, похоже, что params.runDateTimeTz получается только тогда, когда задача начинает запустить, но я хочу получить его раньше и отправить его каждой задаче в качестве аргумента, ожидая, что все задачи получат одинаковое значение.

Может ли кто-нибудь помочь мне в том, что я делаю неправильно?

1 Ответ

0 голосов
/ 17 апреля 2020

Вы можете использовать execution_date или ds из макросов Airflow:

Подробности: https://airflow.apache.org/docs/stable/macros-ref#default -переменные

task1 = GKEPodOperator(
    image='gcr.io/myJar:1.0.1.45',           
    cmds=['java'],
    arguments=["-jar","myJar.jar", "{{ ds }}"],
    dag=dag)

task2 = GKEPodOperator(
    image='gcr.io/myJar2:1.0.1.45',           
    cmds=['java'],
    arguments=["-jar","myJar2.jar", "{{ ds }}"],
    dag=dag)

Если вам нужно метка времени вы можете использовать {{ ts }}

...