Как передать динамические аргументы оператору воздушного потока? - PullRequest
3 голосов
/ 22 марта 2019

Я использую Airflow для запуска заданий Spark в Google Cloud Composer.Мне нужно

  • Создать кластер (параметры YAML предоставлены пользователем)
  • список заданий зажигания (параметры задания также предоставляются для каждого задания YAML)

С Airflow API - я могу читать файлы YAML и передавать переменные в задачи с помощью xcom.

Но рассмотрим DataprocClusterCreateOperator()

  • cluster_name
  • project_id
  • zone

и несколько других аргументов помечены как шаблонные.

Что если я хочу передать другие аргументы как шаблонные (которые в настоящее время не так)?- как image_version, num_workers, worker_machine_type и т. д.?

Есть ли обходной путь для этого?

1 Ответ

4 голосов
/ 23 марта 2019

Не уверен, что вы имеете в виду для «динамического», но при обновлении файла yaml, если процесс чтения файла находится в теле файла dag, dag будет обновлен для применения новых аргументов из файла yaml. Так что на самом деле вам не нужен XCOM для получения аргументов. просто создайте словарь параметров, затем перейдите к default_args:

CONFIGFILE = os.path.join(
    os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')

with open(CONFIGFILE, 'r') as ymlfile:
    CFG = yaml.load(ymlfile)

default_args = {
    'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
    'project_id': CFG['section_A']['project_id'],
    'zone': CFG['section_A']['zone'],
    'mage_version': CFG['section_A']['image_version'],
    'num_workers': CFG['section_A']['num_workers'],
    'worker_machine_type': CFG['section_A']['worker_machine_type'],
    # you can add all needs params here.
}

DAG = DAG(
    dag_id=DAG_NAME,
    schedule_interval=SCHEDULE_INTEVAL,
    default_args=default_args, # pass the params to DAG environment
)

Task1 = DataprocClusterCreateOperator(
    task_id='your_task_id',
    dag=DAG
)

Но если вам нужны динамические символы, а не аргументы, вам может понадобиться другая стратегия, например this .

Так что вам, вероятно, нужно выяснить основную идею: На каком уровне находится динамика? Уровень задачи? Уровень DAG?

Или вы можете создать своего собственного оператора, который будет выполнять работу и принимать параметры.

...