EMR DAG завершается до завершения всех шагов - PullRequest
0 голосов
/ 29 октября 2018

Я использую операторы EMR CreateJobFlow, AddSteps, StepSensor и TerminateJobFlow в моей DAG, чтобы запустить кластер EMR, добавить шаги (2 приложения spark и dist-cp) и завершить работу, когда все шаги завершены или 1 не выполнен. Я могу сделать это, когда у меня есть двухэтапная группа доступности базы данных (1-й - приложение Spark, 2-й - dist-cp), однако, когда у меня есть 2 приложения Spark, кластер успешно выполняет 1-й шаг и завершает работу, не переходя к 2-й и 3-й шаги.

Через некоторое копание я вижу, что Airflow "тыкает" в шаги, чтобы убедиться, что они все еще работают. В этом случае кажется, что он «успешен» только после завершения 1 шага.

Мои искровые приложения довольно просты. Первый создает и записывает данные в локальную HDFS. Вторая считывает данные из HDFS и присоединяется к другому набору данных и записывает обратно в HDFS. Третий шаг - это s3-dist-cp для копирования данных из HDFS в s3. Все 3 шага могут быть успешно выполнены в Spark-Shell в интерактивном режиме или в виде заданий Spark-Submit. Я также сам клонировал кластер EMR (без воздушного потока) и увидел, что все шаги выполняются без ошибок, поэтому EMR и Spark здесь не проблема.

DAG ниже

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator \
    import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator \
    import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator \
    import EmrTerminateJobFlowOperator

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2)   
}

SPARK_TEST_STEPS = [
    {
        'Name': 'monthly_agg',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode',
                       'cluster',
                      '--class' ,
                      'AggApp',
                      's3://jar1.jar' ]
                        }
    },   
    {
        'Name': 'monthly_agg2',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit',
                      '--deploy-mode',
                       'cluster',
                      '--class' ,
                      'SimpleApp',
                      's3:/jar2.jar' ]
                        }
    },  

    {
        'Name': 'copy-data',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['s3-dist-cp',
                      '--src',
                      '/tempo',
                      '--dest',
                       's3://mydata/'
                    ]
                        }
    }
]

JOB_FLOW_OVERRIDES = {
'Instances': {'Ec2SubnetId': 'subnet-mysubnetid', 
    'InstanceGroups': [
        {
            'Name': 'Master nodes',
            'Market': 'ON_DEMAND',
            'InstanceRole': 'MASTER',
            'InstanceType': 'r4.2xlarge',
            'InstanceCount': 1
        },
        {
            'Name': 'Slave nodes',
            'Market': 'ON_DEMAND',
            'InstanceRole': 'CORE',
            'InstanceType': 'r4.2xlarge',
            'InstanceCount': 8,
            'EbsConfiguration': {'EbsBlockDeviceConfigs':[{'VolumeSpecification':{'SizeInGB':128,'VolumeType':'gp2'},'VolumesPerInstance':1}],'EbsOptimized':True}
        }
    ]},
    'Name':'airflow-monthly_agg_custom',
    'Configurations': [
    {
    'Classification':'spark-defaults','Properties':
        {'spark.sql.crossJoin.enabled':'true',
        'spark.serializer':'org.apache.spark.serializer.KryoSerializer',
        'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version':'2',
        "maximizeResourceAllocation":"true"},
    'Configurations':[]
    },
    {
    'Classification':'spark-hive-site','Properties':
    {'hive.metastore.client.factory.class':'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}, 
    'Configurations':[]
    }
    ]}

dag = DAG(
    'monthly_agg_custom',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=4),
    schedule_interval='@once'
)

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

1 Ответ

0 голосов
/ 31 мая 2019

Проблема в том, что вы вводите EmrStepSensor все шаги как один stepadder, поэтому, как только вы заканчиваете, он завершает кластер.

Решение состоит в том, чтобы разделить все шаги и передать последний шаг в EmrStepSensor. В качестве альтернативы, вы можете только последний шаг с отдельным сумматором шагов (step_adder_actual_step) от других и предоставить его EmrStepSensor

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=pre_step,
    dag=dag
)

step_adder_actual_step = EmrAddStepsOperator(
    task_id='actual_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=actual_step,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('actual_step', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator >> step_adder_pre_step >> step_adder_actual_step >> step_checker >> cluster_remover 
...