Привет, поэтому я пытался найти способ изменить и передать аргумент словаря моему оператору на основе входных данных, которые я получаю из вышеуказанных задач. Например, здесь «кластер» - это аргумент словаря, загруженный до инициализации dag
run = CustomOperator(
task_id='task',
new_cluster=cluster,
...,
здесь «кластер» - это значение словаря. изменив его в задаче и вернув его, мне придется использовать xcom, который возвращает строку, даже когда я делаю "{{(xcom) | to json}}", я получаю значение в одинарных кавычках, а не дикт.
ЕСЛИ я изменяю предварительно загруженный словарь непосредственно перед оператором, как показано ниже, он работает, но только если изменение установлено c. Но я хочу, чтобы измененное значение было тем, которое я выбираю из файла на основе ввода в dag из dagRun.conf или возвращаемого значения предыдущей задачи {{xcom_pull}}, оно выдает ошибку при компиляции DAG с надписью BROKEN DAG: ' {{dagRun.conf}} ':
cluster['value a'] = file_change["{{dag_run.conf}}"]
run = CustomOperator(
task_id='task',
new_cluster=cluster,
...,
Я мог бы работать, даже если бы мог вызвать функцию, которая возвращает назад, чтобы получить аргумент:
def modify(cluster):
....
...
..
with dag:
run = CustomOperator(
task_id='task',
new_cluster=modify(cluster),
...
ранняя помощь будет быть полезным.