Я новичок в области воздушных потоков и пытаюсь выяснить, как передать дату выполнения группы DAG для каждой задачи. У меня есть это в моей группе DAG:
tzinfo=tz.gettz('America/Los_Angeles')
dag_run_date = datetime.now(_tzinfo)
dag = DAG(
'myDag',
default_args=default_args,
schedule_interval = None,
params = {
"runDateTimeTz" : dag_run_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
}
)
Затем я пытаюсь передать параметр runDateTimeTz в каждая из моих задач, что-то вроде этого ..
task1 = GKEPodOperator(
image='gcr.io/myJar:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar.jar", {{params.runDateTimeTz}}"],
dag=dag)
task2 = GKEPodOperator(
image='gcr.io/myJar2:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar2.jar", {{params.runDateTimeTz}}"],
dag=dag)
Мои задачи выполняются правильно, но я ожидал, что все они получат одинаковую дату выполнения в params.runDateTimeTz, но этого не произошло, для пример task1 получает params.runDateTimeTz=2020-04-16T07:42:47.412716-07:00
, а task2 получает params.runDateTimeTz= 2020-04-16T07:43:29.913289-07:00
Я полагаю, это поведение связано с тем, как воздушный поток заполняет params
для DAG, похоже, что params.runDateTimeTz получается только тогда, когда задача начинает запустить, но я хочу получить его раньше и отправить его каждой задаче в качестве аргумента, ожидая, что все задачи получат одинаковое значение.
Может ли кто-нибудь помочь мне в том, что я делаю неправильно?