Я новичок в Airflow и занимаюсь разработкой небольшого задания, работающего на EMR, и столкнулся с проблемами на этапах EmrAddStepsOperator и / или step2sense. Спасибо за ваш отзыв и за чтение моего длинного кода ниже. Так как у меня не было пробега, я заподозрил проблемы в C. Кроме того, когда я пропустил step2sense [.], Я обнаружил, что ни один из step1main [.] Не выполнялся вообще, так как почти 0 времени выполнения для задачи, в то время как состояние EMR указывало «Отменено». Итак, я не знаю, имею ли я дело с той же проблемой (C выше) или двумя проблемами.
# A: declare dag and other parameters
# B: create EMR cluster
# C: setup specific list/dict of "tasks," for example ['task1', 'task2', 'task3']
# D: terminate cluster after completion/failure
# E: set dependency: chaining tasks from B to D although ideally should be flexible in C
# A: dag = ...; JOB_FLOW_OVERRIDES = ...
# B
step0_create_emr = EmrCreateJobFlowOperator(task_id='step0_new_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES, aws_conn_id='aws_default',
emr_conn_id='emr_default', dag=_dag)
# C to continue
tasks = ['task1', 'task2', 'task3'] # to cont in loop below
# D
step3_terminate_emr = # setClusterRemover(dag, jobParams["JobFlowId"])
EmrTerminateJobFlowOperator(task_id='step3_terminate_cluster',
job_flow_id='step0_new_job_flow',aws_conn_id='aws_default',dag=_dag)
# C (cont')
step1main, step2sense = [], []
for i, t in enumerate(tasks):
task1_id = "step1main_{}".format(t)
step1main.append(EmrAddStepsOperator(task_id=_task1_id,
job_flow_id="{{ task_instance.xcom_pull('step0_new_job_flow', key='return_value') }}",
aws_conn_id='aws_default', # and steps=MY_TASK_CONFIGS[t],
dag=dag
))
# ??? below for sense_step_id and task2_id BUT very questionable due to '{{ ... }}'
formatter_str = "{} task_instance.xcom_pull('{}', key='return_value')[0] {}"
sense_step_id = formatter_str.format("{{", task1_id, "}}")
task2_id = "step2sense_{}".format(t)
step2sense.append(EmrStepSensor(
task_id=_task2_id,
job_flow_id="{{ task_instance.xcom_pull('step0_new_job_flow', key='return_value') }}",
step_id=sense_step_id,
aws_conn_id='aws_default',
dag=dag
))
step1main[i] >> step2sense[i]
# E
step0_create_emr >> step1main[0]
step3_terminate_emr << step2sense[-1]
Еще раз, спасибо за сообщество здесь.