Передача аргумента словаря Dynami c в оператор в Airflow на основе ввода пользователя - PullRequest
0 голосов
/ 22 апреля 2020

Привет, поэтому я пытался найти способ изменить и передать аргумент словаря моему оператору на основе входных данных, которые я получаю из вышеуказанных задач. Например, здесь «кластер» - это аргумент словаря, загруженный до инициализации dag

run = CustomOperator(
            task_id='task',
            new_cluster=cluster,
            ...,

здесь «кластер» - это значение словаря. изменив его в задаче и вернув его, мне придется использовать xcom, который возвращает строку, даже когда я делаю "{{(xcom) | to json}}", я получаю значение в одинарных кавычках, а не дикт.

ЕСЛИ я изменяю предварительно загруженный словарь непосредственно перед оператором, как показано ниже, он работает, но только если изменение установлено c. Но я хочу, чтобы измененное значение было тем, которое я выбираю из файла на основе ввода в dag из dagRun.conf или возвращаемого значения предыдущей задачи {{xcom_pull}}, оно выдает ошибку при компиляции DAG с надписью BROKEN DAG: ' {{dagRun.conf}} ':

cluster['value a'] = file_change["{{dag_run.conf}}"]
run = CustomOperator(
            task_id='task',
            new_cluster=cluster,
            ...,

Я мог бы работать, даже если бы мог вызвать функцию, которая возвращает назад, чтобы получить аргумент:

def modify(cluster):
....
...
..

with dag:
run = CustomOperator(
            task_id='task',
            new_cluster=modify(cluster),
            ...

ранняя помощь будет быть полезным.

1 Ответ

0 голосов
/ 23 апреля 2020

Важно отметить, что объект models.DAG ( результат dag = DAG(...) ): создается / инициализируется при загрузке файла с процессом , а не когда последующий объект выполняется в контексте прогона. Вы не можете получить доступ к чему-либо, связанному с прогоном, потому что ничего еще не существует.

Я думаю, что вы находитесь на правильном пути в отношении передачи его через XCom, в зависимости от размера, и если это может быть сделано каким-то другим Кстати, но я думаю, что вы также пытаетесь использовать его как шаблонное поле, когда это не так - обновите ваш вопрос с кодом для более подробной информации, пожалуйста.

...