Я пытаюсь настроить процесс AWS EMR в Airflow, и мне нужно, чтобы job_flow_overrides
в EmrCreateJobFlowOperator
и steps
в EmrAddStepsOperator
были установлены отдельными файлами JSON, расположенными в другом месте.
Я пробовал множество способов как напрямую связать файлы JSON, так и установить и получить переменные воздушного потока для JSON.Если бы я использовал переменные воздушного потока, их также нужно было бы динамически называть, с чем у меня проблемы.Я могу легко Variable.set
динамическое имя с помощью PythonOperator, но не могу Variable.get
динамическое имя в job_flow_overrides
или steps
из-за ограничений Airflow по написанию кода Python вне PythonOperator.
Переменные воздушного потока уже были установлены ранее в коде, ниже мой код пытается использовать данные JSON и настроить кластер
def get_global_json_contents():
return json.dumps(requests.get("PATH/TO/JSON/FILE").json())
# Use the 'Name' Key in this JSON as a specific identifier for the Variables created by this job
def get_global_json_name():
return json.loads(get_global_json_contents())['Name']
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
emr_conn_id='emr_default',
job_flow_overrides=json.loads(Variable.get("CLUSTER_SETUP-"+get_global_json_name())),
dag=dag
)
add_steps = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=json.loads(Variable.get("RUN_STEPS-"+get_global_json_name())),
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
Кто-нибудь знает, как я могу заставить этот процесс работать?
У кого-нибудь есть идеи, как обойти ограничения невозможности использования функций Python в Airflow за пределами PythonOperator
и python_callable
?
Возможно ли решить эту проблему?определив функции в отдельном файле Python, расположенном в другом месте, и импортировав его в Airflow?И если так, как бы я поступил так?