Безусловно, это было бы наиболее эффективным использованием ресурсов.Позвольте мне предупредить вас: в этом много деталей;Я постараюсь перечислить столько, сколько вы получите.Я рекомендую вам добавить собственный исчерпывающий ответ, в котором перечислены все проблемы, с которыми вы столкнулись, и способ их решения (после того, как вы это сделаете)* Для создания и завершения кластера у вас есть EmrCreateJobFlowOperator
и EmrTerminateJobFlowOperator
соответственно
Не беспокойтесь, если вы не используетеAWS
SecretAccessKey
(и полностью полагаться на IAM
Роли );создание любого AWS
-связанного hook
или operator
в Airflow
будет автоматически откатится к базовому EC2
прикрепленному IAM
Роль
Если вы НЕ используете EMR-Steps API для отправки задания, вам также придется вручную воспринимать обе вышеуказанные операции, используя Sensors
.Уже есть датчик для фазы создания опроса, который называется EmrJobFlowSensor
, и вы можете слегка изменить его, чтобы создать датчик для завершения
Вы передаете свой кластер-конфигурацию JSONв job_flow_extra
.Вы также можете передавать конфиги в Connection
(например, my_emr_conn
) extra
param , но воздерживаться от этого, потому что это часто прерывает загрузку SQLAlchemy
ORM (так какэто большой json
)
Относительно отправки задания
Вы можете либо отправить задания в Emr
с помощью API EMR-StepsЭто можно сделать либо на этапе создания кластера (в JSON Cluster-Configs), либо впоследствии, используя add_job_flow_steps()
.Существует даже emr_add_steps_operator()
в Airflow
, для которого также требуется EmrStepSensor
.Вы можете прочитать больше об этом в AWS
документах , и вам также, возможно, придется использовать command-runner.jar
Для конкретных случаев применения(например, Hive
, Livy
), вы можете использовать их конкретными способами.Например, вы можете использовать HiveServer2Hook
, чтобы отправить Hive
работу.Вот сложная часть: run_job_flow()
вызов (сделанный на этапе создания кластера) дает вам только job_flow_id
(идентификатор кластера).Вам нужно будет использовать describe_cluster()
вызов , используя EmrHook
, чтобы получить частный IP-адрес главного узла .Используя это, вы сможете программно создать Connection
(например, Hive Server 2 Thrift
connection ) и использовать его для отправки ваших вычислений в кластер.И не забудьте удалить эти соединения (для удобства) до завершения рабочего процесса.
Наконец, есть старый добрый bash для взаимодействия с кластером.Для этого вы также должны передать пару ключей EC2
на этапе создания кластера .После этого вы можете программно создать SSH
соединение и использовать его (с SSHHook
или SSHOperator
) для запуска заданий в кластере.Подробнее о SSH-материалах в Airflow
здесь
Специально для отправки Spark
заданий в удаленных Emr
cluster, читать это обсуждение