Как установить динамическое имя для job_flow_overrides в Airflow EmrCreateJobFlowOperator? - PullRequest
1 голос
/ 27 сентября 2019

Я пытаюсь настроить процесс AWS EMR в Airflow, и мне нужно, чтобы job_flow_overrides в EmrCreateJobFlowOperator и steps в EmrAddStepsOperator были установлены отдельными файлами JSON, расположенными в другом месте.

Я пробовал множество способов как напрямую связать файлы JSON, так и установить и получить переменные воздушного потока для JSON.Если бы я использовал переменные воздушного потока, их также нужно было бы динамически называть, с чем у меня проблемы.Я могу легко Variable.set динамическое имя с помощью PythonOperator, но не могу Variable.get динамическое имя в job_flow_overrides или steps из-за ограничений Airflow по написанию кода Python вне PythonOperator.

Переменные воздушного потока уже были установлены ранее в коде, ниже мой код пытается использовать данные JSON и настроить кластер

def get_global_json_contents():
    return json.dumps(requests.get("PATH/TO/JSON/FILE").json())

# Use the 'Name' Key in this JSON as a specific identifier for the Variables created by this job
def get_global_json_name():
    return json.loads(get_global_json_contents())['Name']

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    job_flow_overrides=json.loads(Variable.get("CLUSTER_SETUP-"+get_global_json_name())),
    dag=dag
)

add_steps = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=json.loads(Variable.get("RUN_STEPS-"+get_global_json_name())),
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

Кто-нибудь знает, как я могу заставить этот процесс работать?

У кого-нибудь есть идеи, как обойти ограничения невозможности использования функций Python в Airflow за пределами PythonOperator и python_callable?

Возможно ли решить эту проблему?определив функции в отдельном файле Python, расположенном в другом месте, и импортировав его в Airflow?И если так, как бы я поступил так?

1 Ответ

1 голос
/ 27 сентября 2019

Я не совсем понял проблему здесь, но из собранных мной битов я могу дать некоторые идеи


A.Если динамизм относится к часто (раз в несколько дней) меняющихся имен файлов

Сохраните имя файла в потоке воздуха Variable (вы, кажется, уже поняли это)

  • ссылается на имя файла из переменной воздушного потока (вместо жесткого кодирования в файле определения dag)
  • и когда возникает необходимостьВы можете обновить имя файла (переменную) через сам WebUI

B.Если dynamicism означает , каждая задача / dag-run должна обрабатывать отдельный файл , то вот параметры

  1. Внешняя система, которая генерирует этиФайлы JSON также могут обновлять вышеупомянутую переменную воздушного потока, чтобы ваши задачи могли выбрать правильное имя файла.Мне лично не нравится этот подход, так как внешняя система должна учитывать Airflow, а также она склонна к ошибкам

  2. Если возможно, задачи внешней системы и Airflow должны соответствоватьнекоторые соглашения об именах файлов , так что нам даже не нужно сообщать точное имя файла задаче (задача уже знает, как определить имя файла).Например, вы можете иметь свой файл с именем {execution_date}-{dag_id}-{env}-{task_id}.json.Этот подход относительно хорош, но только в том случае, если шаблонное имя файла подходит для вашей системы (что может быть не так)

  3. Задача восходящего потока, которая получает точное имя файла, может передать эту информацию в последующую задачу через XCOM .Этот подход является наиболее надежным из всех


Что касается вашего запроса

У кого-нибудь есть идеи, как обойти ограничения неспособностииспользовать функции Python в Airflow за пределами PythonOperator и python_callable?

Я думаю, что для решения этой проблемы требуется лишь некоторое привыкание (и немного творчества) с фреймворком Airflow.Вы всегда можете создать подкласс любого оператора воздушного потока (включая EmrCreateJobFlowOperator), а затем переопределить методы pre_execute(), execute(), & post_execute() и добавить туда свою собственную логику


Рекомендуемые чтения


EDIT-1

, отвечая на вопросы, заданные в комментариях

Я не смог заставить job_flow_overrides принять XCOM в качестве входных данных.Знаете ли вы, как это исправить?

Зная, что job_flow_overrides является полем шаблона из EmrCreateJobFlowOperator (разработчики Airflow имели в виду ваш сценарий использования), вы можете использовать JINJA с шаблонами для тянуть его через XCOM

import json
my_dag = DAG(dag_id="my_dag",
             ..
             user_defined_macros={
                 'json': json
             }
             ..
             )
create_emr_task = EmrCreateJobFlowOperator(dag_id="my_dag",
                                           task_id="create_emr_task",
                                           ..
                                           job_flow_overrides="{{ json.loads(ti.xcom_pull(task_ids='my_xcom_pusher_task')) }}"
                                           ..)

есть ли у вас примеры использования / перезаписи методов execute ()?

Что ж, теперь это чистый python вопрос.Если вы когда-либо занимались объектно-ориентированным программированием, особенно наследованием , вы почувствуете себя здесь как дома.

  • Это включает в себя просто создание подкласса BaseOperator (или любой его подкласс) и определение пользовательских функций в функции execute()
  • Например, PythonOperator также расширяет BaseOperator

Ссылки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...