EmrStepSensor - Произошла ошибка (500) при вызове операции DescribeStep - PullRequest
0 голосов
/ 27 мая 2019

В моем DAG EMRStepSensor не работает во время просмотра шага.Это DAG

dag >> инициализатор >> create_cluster >> emr_step >> watch_step >> cluster_remover

Группа обеспечения доступности баз данных инициализирована, кластер EMR создан, EMRшаг был успешно добавлен.Сбой EMRStepSensor с ошибкой

botocore.exceptions.ClientError: Произошла ошибка (500) при вызове операции DescribeStep (достигнуто максимальное количество попыток: 4): Внутренняя ошибка сервера

Я проверил код Python.

emr_step_sensor.py вызывает client.py и выполняет вызов операции Describe и завершается ошибкой

initializer = InitializerOperator(
    task_id='initializer',
    dag=dag
)

job_flow = EmrJobFlowOperator(
    task_id='create_cluster',
    aws_conn_id='worflow_aws',
    config_file='dev_loader.cfg',
    initializer_task_id='initializer',
    core_instance_count=20,
    dag=dag
)


step_add = EmrAddStepOperator(
    task_id='emr_step',
    aws_conn_id='worflow_aws',
    config_file='dev_loader_step.cfg',
    job_flow_id="{{ task_instance.xcom_pull('create_cluster', key='return_value') }}",
    step_id='cassandraloader',
    initializer_task_id='initializer',
    dag=dag
)

step_checker = EmrStepCheckSensor(
    task_id='watch_step',
    aws_conn_id='worflow_aws',
    poke_interval=300,
    job_flow_id="{{ task_instance.xcom_pull('create_cluster', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('emr_step', key='return_value')[0] }}",
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    aws_conn_id='worflow_aws',
    job_flow_id="{{ task_instance.xcom_pull('create_cluster', key='return_value') }}",
    dag=dag
)

EMRStepsensor должен продолжать отслеживать состояниешага.

...