У меня есть кластер EMR в AWS. Я хочу запустить DAG из воздушного потока в существующий кластер AWS - PullRequest
1 голос
/ 14 октября 2019

У меня есть машина с воздушным потоком, которая имеет версию apache-airflow == 1.10.5. Я знаю, как запустить DAG, который автоматически создает кластер и выполняет шаг и завершить кластер. Используя соединения в интерфейсе воздушного потока, я могу добиться этого. Но чтобы запустить dag на существующем кластере aws emr, я не могу знать, какие параметры мне нужно передать в соединениях.

AIRFLOW UI -> Admin -> Connections -> Creed Conn ID (EMR Default1), тип conn Elastic Map уменьшить.

[2019-10-14 12:12:40,919] {taskinstance.py:1051} ERROR - Parameter validation failed:
Missing required parameter in input: "Instances"
Traceback (most recent call last):
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/operators/emr_create_job_flow_operator.py", line 68, in execute
    response = emr.create_job_flow(self.job_flow_overrides)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/hooks/emr_hook.py", line 55, in create_job_flow
    response = self.get_conn().run_job_flow(**config)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 586, in _make_api_call
    api_params, operation_model, context=request_context)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 621, in _convert_to_request_dict
    api_params, operation_model)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/validate.py", line 291, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in input: "Instances"
[2019-10-14 12:12:40,920] {taskinstance.py:1082} INFO - Marking task as FAILED.

1 Ответ

1 голос
/ 23 октября 2019

В первом случае вместо динамического создания / завершения кластеров с помощью пользовательского интерфейса вы также можете достичь этого, расширив оператор SparkSubmitOperator. После запуска кластера EMR вы можете скопировать файлы * .xml (например, core-site.xml ) из мастера EMR в какое-то место на узле воздушного потока, а затем указать на эти файлы в вашей задаче spark-submit врасход воздуха. По крайней мере, мы делаем это в тот день в нашем продукте. Если говорить логически, если вы планируете повторно использовать существующий кластер, все, что вам нужно, это знать, где эти * .xml файлы уже хранятся. Тогда все остальное будет таким же. Вы должны ссылаться только на эти файлы при запуске задачи.

Подробнее

Я не знаю ни одного такого документа, поэтому я могу только предложить вамисследуйте следующее, основанное на знаниях, которые я собрал:

  1. Нам нужно написать собственный плагин для spark-submit. В рамках этого модуля пользовательских плагинов давайте определим класс CustomSparkSubmitOperator. Нужно продлить BaseOperator. Вы можете найти множество статей о написании пользовательских плагинов в airflow. Это может быть хорошим местом для начала. Здесь , вы можете увидеть более подробную информацию о BaseOperator.

  2. В BaseOperator вы найдете метод с именем pre_execute. В этом методе можно выполнить следующие действия:

    a. Подождите, пока ваш кластер не работает. Вы можете легко сделать это, используя boto3, если передаете идентификатор кластера.

    b. Как только кластер запущен, получите ip главного узла EMR и скопируйте материалы, соответствующие /etc/hadoop/conf/*-site.xml, на ваш узел воздушного потока. Это возможно с помощью вызова подпроцесса в python.

  3. Как только вы получите xml-файл, в методе execute просто используйте SparkSubmitHook, чтобы отправить свое искровое задание. Вам нужно убедиться, что искровые двоичные файлы на вашем узле воздушного потока используют этот путь для искровой отправки.

  4. Вы можете очистить кластер методом post_execute в случае необходимости.

...