Создание кластера EMR с использованием запуска Airflow dag. После выполнения задачи EMR будет прервана. - PullRequest
0 голосов
/ 18 марта 2019

У меня есть задания Airflow, которые нормально работают на кластере EMR.что мне нужно, скажем, если у меня есть 4 задания потока воздуха, для которых требуется кластер EMR, скажем, 20 минут для выполнения задачи.почему бы нам не создать кластер EMR во время выполнения DAG, и как только задание будет завершено, оно завершит созданный кластер EMR.

Ответы [ 2 ]

0 голосов
/ 19 марта 2019

Безусловно, это было бы наиболее эффективным использованием ресурсов.Позвольте мне предупредить вас: в этом много деталей;Я постараюсь перечислить столько, сколько вы получите.Я рекомендую вам добавить собственный исчерпывающий ответ, в котором перечислены все проблемы, с которыми вы столкнулись, и способ их решения (после того, как вы это сделаете)* Для создания и завершения кластера у вас есть EmrCreateJobFlowOperator и EmrTerminateJobFlowOperator соответственно

Не беспокойтесь, если вы не используетеAWS SecretAccessKey (и полностью полагаться на IAM Роли );создание любого AWS -связанного hook или operator в Airflow будет автоматически откатится к базовому EC2 прикрепленному IAM Роль

Если вы НЕ используете EMR-Steps API для отправки задания, вам также придется вручную воспринимать обе вышеуказанные операции, используя Sensors.Уже есть датчик для фазы создания опроса, который называется EmrJobFlowSensor, и вы можете слегка изменить его, чтобы создать датчик для завершения

Вы передаете свой кластер-конфигурацию JSONв job_flow_extra.Вы также можете передавать конфиги в Connection (например, my_emr_conn) extra param , но воздерживаться от этого, потому что это часто прерывает загрузку SQLAlchemy ORM (так какэто большой json)


Относительно отправки задания

  • Вы можете либо отправить задания в Emr с помощью API EMR-StepsЭто можно сделать либо на этапе создания кластера (в JSON Cluster-Configs), либо впоследствии, используя add_job_flow_steps().Существует даже emr_add_steps_operator() в Airflow, для которого также требуется EmrStepSensor.Вы можете прочитать больше об этом в AWS документах , и вам также, возможно, придется использовать command-runner.jar

  • Для конкретных случаев применения(например, Hive, Livy), вы можете использовать их конкретными способами.Например, вы можете использовать HiveServer2Hook, чтобы отправить Hive работу.Вот сложная часть: run_job_flow() вызов (сделанный на этапе создания кластера) дает вам только job_flow_id (идентификатор кластера).Вам нужно будет использовать describe_cluster() вызов , используя EmrHook, чтобы получить частный IP-адрес главного узла .Используя это, вы сможете программно создать Connection (например, Hive Server 2 Thrift connection ) и использовать его для отправки ваших вычислений в кластер.И не забудьте удалить эти соединения (для удобства) до завершения рабочего процесса.

  • Наконец, есть старый добрый bash для взаимодействия с кластером.Для этого вы также должны передать пару ключей EC2 на этапе создания кластера .После этого вы можете программно создать SSH соединение и использовать его (с SSHHook или SSHOperator) для запуска заданий в кластере.Подробнее о SSH-материалах в Airflow здесь

  • Специально для отправки Spark заданий в удаленных Emr cluster, читать это обсуждение


0 голосов
/ 18 марта 2019

Лучший способ сделать это, вероятно, иметь узел в корне вашей DAG Airflow, который создает кластер EMR, а затем еще один узел в самом конце DAG, который раскручивает кластер после всех остальных узлов. завершено.

...