Поток воздуха на EMR: как настроить несколько задач EMR в одном кластере - PullRequest
0 голосов
/ 07 апреля 2020

Я новичок в Airflow и занимаюсь разработкой небольшого задания, работающего на EMR, и столкнулся с проблемами на этапах EmrAddStepsOperator и / или step2sense. Спасибо за ваш отзыв и за чтение моего длинного кода ниже. Так как у меня не было пробега, я заподозрил проблемы в C. Кроме того, когда я пропустил step2sense [.], Я обнаружил, что ни один из step1main [.] Не выполнялся вообще, так как почти 0 времени выполнения для задачи, в то время как состояние EMR указывало «Отменено». Итак, я не знаю, имею ли я дело с той же проблемой (C выше) или двумя проблемами.

# A: declare dag and other parameters
# B: create EMR cluster
# C: setup specific list/dict of "tasks," for example ['task1', 'task2', 'task3'] 
# D: terminate cluster after completion/failure
# E: set dependency: chaining tasks from B to D although ideally should be flexible in C  

# A: dag = ...; JOB_FLOW_OVERRIDES = ...
# B
step0_create_emr = EmrCreateJobFlowOperator(task_id='step0_new_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES, aws_conn_id='aws_default',
    emr_conn_id='emr_default', dag=_dag) 

# C to continue
tasks = ['task1', 'task2', 'task3']     # to cont in loop below

# D 
step3_terminate_emr = # setClusterRemover(dag, jobParams["JobFlowId"])
EmrTerminateJobFlowOperator(task_id='step3_terminate_cluster',
    job_flow_id='step0_new_job_flow',aws_conn_id='aws_default',dag=_dag)

# C (cont') 
step1main, step2sense = [], []
for i, t in enumerate(tasks):
    task1_id = "step1main_{}".format(t)       
    step1main.append(EmrAddStepsOperator(task_id=_task1_id,
        job_flow_id="{{ task_instance.xcom_pull('step0_new_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',  # and steps=MY_TASK_CONFIGS[t],
        dag=dag
    ))
    # ??? below for sense_step_id and task2_id BUT very questionable due to '{{ ... }}'
    formatter_str = "{} task_instance.xcom_pull('{}', key='return_value')[0] {}"
    sense_step_id = formatter_str.format("{{", task1_id, "}}") 
    task2_id = "step2sense_{}".format(t)
    step2sense.append(EmrStepSensor(
        task_id=_task2_id,
        job_flow_id="{{ task_instance.xcom_pull('step0_new_job_flow', key='return_value') }}",
        step_id=sense_step_id,
        aws_conn_id='aws_default',
        dag=dag
    ))
    step1main[i] >> step2sense[i]

# E 
step0_create_emr >> step1main[0]
step3_terminate_emr << step2sense[-1]

Еще раз, спасибо за сообщество здесь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...