Завершение кластера emr воздушного потока перед завершением шага - PullRequest
0 голосов
/ 31 мая 2019

Я запускаю кластер на EMR и отправляю некоторые шаги на нем с использованием воздушного потока.

Что я хочу:

Я хочу завершить кластер после завершения всех шагов, добавленных через EmrAddStepsOperator

Что я пробовал:

Я пытался использовать вместе EmrStepSensor и EmrTerminateJobFlowOperator, но мой шаг искры отменяется, и кластер прекращает работу без выполнения всех шагов

Кто-нибудь, пожалуйста, предложите, как это сделать правильно. вот мой код

dag = DAG('emr_job_flow_automatic_steps_17',
         default_args=default_args,
         schedule_interval="@daily",
         max_active_runs=1,
         catchup=True,
)


upload_to_S3_task = PythonOperator(
   task_id='upload_to_S3',
   python_callable=upload_file_to_S3,
   op_kwargs={
       'filename': '/home/ab/projects/test.py',
       'key': 'test.py',
       'bucket_name': 'dep-buck',
   },
   dag=dag)

cluster_creator = EmrCreateJobFlowOperator(
   task_id='create_job_flow2',
   job_flow_overrides=JOB_FLOW_OVERRIDES,
   aws_conn_id='aws_default',
   emr_conn_id='emr_default',
   dag=dag
)

step_adder = EmrAddStepsOperator(
   task_id='add_steps',
   job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
   aws_conn_id='aws_default',
   steps=step,
   dag=dag
)
step_checker = EmrStepSensor(
   task_id='watch_step',
   job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
   step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
   aws_conn_id='aws_default',
   dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
   task_id='remove_cluster',
   job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
   aws_conn_id='aws_default',
   dag=dag
)

upload_to_S3_task >> cluster_creator >> step_adder >> step_checker >> cluster_remover

Другие вопросы по stackoverflow:

Существует один аналогичный вопрос о стековом потоке, но он не получил правильного ответа (с использованием EmrTerminateJobFlowOperator)

1 Ответ

0 голосов
/ 31 мая 2019

Так что мне удалось это сделать.Проблема в том, что я подал EmrStepSensor все шаги как один stepadder, поэтому, как только он заканчивал, он завершал кластер.

Решение состоит в том, чтобы отделить все шаги и дать последний шаг'ID для EmrStepSensor.В качестве альтернативы я отделил только последний шаг с его отдельным сумматором шагов (step_adder_actual_step) от других и предоставил его EmrStepSensor

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=pre_step,
    dag=dag
)

step_adder_actual_step = EmrAddStepsOperator(
    task_id='actual_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=actual_step,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('actual_step', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator >> step_adder_pre_step >> step_adder_actual_step >> step_checker >> cluster_remover 
...