Поток воздуха DAG EMR EmrCreateJobFlowOperator Ничего не делает - PullRequest
0 голосов
/ 04 февраля 2019

Я пытаюсь запустить Airflow dag, который создает кластер EMR, добавляет несколько шагов, проверяет их и, наконец, завершает созданный кластер EMR.Но когда я запускаю Airflow Dag, он постоянно находится в рабочем состоянии и не показывает никаких ошибок или журналов.

Может кто-нибудь сказать мне, что я делаю здесь неправильно ??Есть ли пропущенный параметр, который я должен добавить?Или это проблема с графиком дага?

import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import 
EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import 
EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import 
EmrTerminateJobFlowOperator

DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}


HIVE_CLOUDFRONT = [
{
    'Name': 'cloudfront',
    'ActionOnFailure': 'CONTINUE',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': [
            'hive-script',
             '--run-hive-script',
             '--args',
             '-f', 
             's3://BUCKET/xnder/scripts/Hive_CloudFront.q',
              '-d',
                            'INPUT=s3://BUCKET/',
             '-d',
                            'OUTPUT=s3://BUCKET/output5/'
        ]
    }
}
]

JOB_FLOW_OVERRIDES = {
'Name' : 'test1212',
'LogUri' : 's3://BUCKET/log.txt',
'ReleaseLabel' : 'emr-4.1.0',
'Instances' : {
  'InstanceGroups': [
        {
            'Name': 'Master nodes',
            'Market': 'ON_DEMAND',
            'InstanceRole': 'MASTER',
            'InstanceType': 'm1.large',
            'InstanceCount': 1,
        },
        {
            'Name': 'Slave nodes',
            'Market': 'ON_DEMAND',
            'InstanceRole': 'CORE',
            'InstanceType': 'm1.large',
            'InstanceCount': 1,
        }
    ],
    'KeepJobFlowAliveWhenNoSteps': True,
    'TerminationProtected': False
},
'Applications':[{ 
    'Name': 'Hadoop'
 }],
'JobFlowRole':'EMR_EC2_DefaultRole',
'ServiceRole':'EMR_DefaultRole'
}

dag = DAG(
'emr_test_manual',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
#schedule_interval='0 3 * * *'
#schedule_interval=timedelta(seconds=10)
schedule_interval='@once'
)

cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)

step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=HIVE_CLOUDFRONT,
dag=dag
)

step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)
...