В моем DAG EMRStepSensor не работает во время просмотра шага.Это DAG
dag >> инициализатор >> create_cluster >> emr_step >> watch_step >> cluster_remover
Группа обеспечения доступности баз данных инициализирована, кластер EMR создан, EMRшаг был успешно добавлен.Сбой EMRStepSensor с ошибкой
botocore.exceptions.ClientError: Произошла ошибка (500) при вызове операции DescribeStep (достигнуто максимальное количество попыток: 4): Внутренняя ошибка сервера
Я проверил код Python.
emr_step_sensor.py вызывает client.py и выполняет вызов операции Describe и завершается ошибкой
initializer = InitializerOperator(
task_id='initializer',
dag=dag
)
job_flow = EmrJobFlowOperator(
task_id='create_cluster',
aws_conn_id='worflow_aws',
config_file='dev_loader.cfg',
initializer_task_id='initializer',
core_instance_count=20,
dag=dag
)
step_add = EmrAddStepOperator(
task_id='emr_step',
aws_conn_id='worflow_aws',
config_file='dev_loader_step.cfg',
job_flow_id="{{ task_instance.xcom_pull('create_cluster', key='return_value') }}",
step_id='cassandraloader',
initializer_task_id='initializer',
dag=dag
)
step_checker = EmrStepCheckSensor(
task_id='watch_step',
aws_conn_id='worflow_aws',
poke_interval=300,
job_flow_id="{{ task_instance.xcom_pull('create_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('emr_step', key='return_value')[0] }}",
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
aws_conn_id='worflow_aws',
job_flow_id="{{ task_instance.xcom_pull('create_cluster', key='return_value') }}",
dag=dag
)
EMRStepsensor должен продолжать отслеживать состояниешага.