Я пытаюсь запустить Airflow dag, который создает кластер EMR, добавляет несколько шагов, проверяет их и, наконец, завершает созданный кластер EMR.Но когда я запускаю Airflow Dag, он постоянно находится в рабочем состоянии и не показывает никаких ошибок или журналов.
Может кто-нибудь сказать мне, что я делаю здесь неправильно ??Есть ли пропущенный параметр, который я должен добавить?Или это проблема с графиком дага?
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import
EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import
EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import
EmrTerminateJobFlowOperator
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
HIVE_CLOUDFRONT = [
{
'Name': 'cloudfront',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'hive-script',
'--run-hive-script',
'--args',
'-f',
's3://BUCKET/xnder/scripts/Hive_CloudFront.q',
'-d',
'INPUT=s3://BUCKET/',
'-d',
'OUTPUT=s3://BUCKET/output5/'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name' : 'test1212',
'LogUri' : 's3://BUCKET/log.txt',
'ReleaseLabel' : 'emr-4.1.0',
'Instances' : {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm1.large',
'InstanceCount': 1,
},
{
'Name': 'Slave nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm1.large',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False
},
'Applications':[{
'Name': 'Hadoop'
}],
'JobFlowRole':'EMR_EC2_DefaultRole',
'ServiceRole':'EMR_DefaultRole'
}
dag = DAG(
'emr_test_manual',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
#schedule_interval='0 3 * * *'
#schedule_interval=timedelta(seconds=10)
schedule_interval='@once'
)
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=HIVE_CLOUDFRONT,
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)