Использование входных переменных Json в операторе EMR воздушного потока - PullRequest
0 голосов
/ 19 сентября 2019

В настоящее время я следую приведенному здесь шаблону: https://github.com/apache/airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py, чтобы создать группу обеспечения доступности баз данных для вызова экземпляра emr с помощью spark submit.При настройке spark_test_steps мне нужно включить переменные, переданные из POST Json, чтобы заполнить отправку spark, как показано ниже:

SPARK_TEST_STEPS = [
    {
        'Name': 'calculate_pi',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                '/usr/lib/spark/bin/run-example',
                'SparkPi',
                kwargs['dag_run'].conf['var_1']
                kwargs['dag_run'].conf['var_2']
                kwargs['dag_run'].conf['var_3']
                '10'
            ]
        }
    }
]

Как я могу передать переменные, заданные POST Json, при этом следуя форматуприведенный в ссылке на git, чтобы выглядеть как ниже?

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator \
    import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator \
    import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator \
    import EmrTerminateJobFlowOperator

DEFAULT_ARGS = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

SPARK_TEST_STEPS = [
    {
        'Name': 'calculate_pi',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                '/usr/lib/spark/bin/run-example',
                'SparkPi',
                kwargs['dag_run'].conf['var_1']
                kwargs['dag_run'].conf['var_2']
                kwargs['dag_run'].conf['var_3']
                '10'
            ]
        }
    }
]

JOB_FLOW_OVERRIDES = {
    'Name': 'PiCalc'
}

dag = DAG(
    'emr_job_flow_manual_steps_dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval='0 3 * * *'
)

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

1 Ответ

0 голосов
/ 20 сентября 2019

Вчера я имел дело с подобной проблемой и исправил ее с помощью такого решения.Если у вас есть файл, содержащий JSON, вы можете использовать Переменные , заданные через Admin-> Переменные .

Используйте PythonOperator для чтения в файле JSON и сохранения его в локальной переменной, установите переменную воздушного потока, используя Variable.set("VARIABLE_NAME",JSON_CONTENTS_VARIABLE).

Затем вы можете установить содержимое JSON переменной в шаг в EmrAddStepsOperator , вызвав steps=Variable.get("VARIABLE_NAME", deserialize_json=True)

Надеюсь, это поможет.

...