Как использовать Airflow для перезапуска неудачного искрового задания структурированной потоковой передачи? - PullRequest
1 голос
/ 13 июля 2020

Мне нужно запустить задание искры структурированной потоковой передачи в AWS EMR. В качестве требования к устойчивости, если искровое задание не удалось по каким-либо причинам, мы надеемся, что искровое задание можно воссоздать в EMR. Это похоже на оркестровку задач в ECS, которая может перезапустить задачу, если проверка работоспособности не удалась. Однако EMR - это скорее вычислительный механизм, чем система оркестровки.

Я ищу какой-нибудь инструмент оркестровки рабочих процессов с большими данными, например Airflow. Однако он не может поддерживать цикл в DAG. Как я могу реализовать некоторые функции, как показано ниже?

step_adder (EmrAddStepsOperator) >> step_checker (EmrStepSensor) >> step_adder (EmrAddStepsOperator).

Каков предлагаемый способ повышения устойчивости такого уровня задания? Любые комментарии приветствуются!

1 Ответ

1 голос
/ 13 июля 2020
• 1000 В вашем случае Sensor может помочь определить, произошло ли определенное условие или нет. Исходя из этого, вы можете решить в DAG. Вот простой HttpSensor, который ждет пакетного задания, чтобы убедиться, что оно успешно завершено
wait_batch_to_finish = HttpSensor(
    http_conn_id='spark_web',
    task_id="wait_batch_to_finish",
    method="GET",
    headers={"Content-Type": "application/json"},
    endpoint="/json",
    response_check=lambda response: check_spark_status(response, "{{ ti.xcom_pull('batch_intel_task')}}"),
    poke_interval=60,
    dag=dag
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...