Не уверен, что вы имеете в виду для «динамического», но при обновлении файла yaml, если процесс чтения файла находится в теле файла dag, dag будет обновлен для применения новых аргументов из файла yaml. Так что на самом деле вам не нужен XCOM для получения аргументов.
просто создайте словарь параметров, затем перейдите к default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args = {
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
}
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
Но если вам нужны динамические символы, а не аргументы, вам может понадобиться другая стратегия, например this .
Так что вам, вероятно, нужно выяснить основную идею:
На каком уровне находится динамика? Уровень задачи? Уровень DAG?
Или вы можете создать своего собственного оператора, который будет выполнять работу и принимать параметры.