В настоящее время я следую приведенному здесь шаблону: https://github.com/apache/airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py, чтобы создать группу обеспечения доступности баз данных для вызова экземпляра emr с использованием spark submit. При настройке spark_test_steps мне нужно включить переменные, переданные из POST Json, чтобы заполнить отправку spark, как показано ниже:
SPARK_TEST_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
kwargs['dag_run'].conf['var_1']
kwargs['dag_run'].conf['var_2']
'10'
]
}
}
]
Как передать переменные, заданные POST Json, при этом следуя форматуприведенная в ссылке на git выглядит следующим образом?
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
DEFAULT_ARGS = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
dag = DAG(
'emr_job_flow_manual_steps_dag',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='0 3 * * *'
)
var_1 = ''
var_2 = ''
SPARK_TEST_STEPS = []
def define_param(**kwargs):
global var_1
global var_2
global SPARK_TEST_STEPS
var_1 = str(kwargs['dag_run'].conf['var_1'])
var_2 = str(kwargs['dag_run'].conf['var_2'])
SPARK_TEST_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
kwargs['dag_run'].conf['var_1']
kwargs['dag_run'].conf['var_2']
'10'
]
}
}
]
return SPARK_TEST_STEPS
DEFINE_PARAMETERS = PythonOperator(
task_id='DEFINE_PARAMETERS',
python_callable=define_param,
provide_context=True,
dag=dag)
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps='{{ ti.xcom_pull(task_ids="DEFINE_PARAMETERS") }}',
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)
Я не могу использовать Variable.get и Variable.set, так как это не разрешит множественные вызовы dag для разных типов переменных одновременно из-за постоянного измененияглобальных переменных воздушного потока. Я попытался вызвать SPARK_TEST_STEPS с помощью xcom, но тип возвращаемого значения xcom - строка, а для шагов EmrAddStepsOperator требуется список.