Я запускаю кластер на EMR и отправляю некоторые шаги на нем с использованием воздушного потока.
Что я хочу:
Я хочу завершить кластер после завершения всех шагов, добавленных через EmrAddStepsOperator
Что я пробовал:
Я пытался использовать вместе EmrStepSensor
и EmrTerminateJobFlowOperator
, но мой шаг искры отменяется, и кластер прекращает работу без выполнения всех шагов
Кто-нибудь, пожалуйста, предложите, как это сделать правильно. вот мой код
dag = DAG('emr_job_flow_automatic_steps_17',
default_args=default_args,
schedule_interval="@daily",
max_active_runs=1,
catchup=True,
)
upload_to_S3_task = PythonOperator(
task_id='upload_to_S3',
python_callable=upload_file_to_S3,
op_kwargs={
'filename': '/home/ab/projects/test.py',
'key': 'test.py',
'bucket_name': 'dep-buck',
},
dag=dag)
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow2',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
steps=step,
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
upload_to_S3_task >> cluster_creator >> step_adder >> step_checker >> cluster_remover
Другие вопросы по stackoverflow:
Существует один аналогичный вопрос о стековом потоке, но он не получил правильного ответа (с использованием EmrTerminateJobFlowOperator)