Как я могу перевести Spark Client submitApplication в API Yarn Rest? - PullRequest
0 голосов
/ 09 декабря 2018

В настоящее время у меня есть рабочая реализация кода для отправки приложения в Yarn с использованием spark.deploy.yarn.Client .Сложно агрегировать все аргументы, которые нужны этому клиенту, но подача приложения проста:

ClientArguments cArgs = new ClientArguments(args.toArray(new String[0]));
client = new Client(cArgs, sparkConf);
applicationID = client.submitApplication();

Большая часть кода до этого момента накапливала sparkConf и арг .Теперь я хочу удалить Client и работать только с Rest.Spark предлагает полный REST API, включая отправку приложений - в соответствии с документацией Spark речь идет об этом простом сообщении json / xml:

POST http://<rm http address:port>/ws/v1/cluster/apps
Accept: application/json
Content-Type: application/json
{
  "application-id":"application_1404203615263_0001",
  "application-name":"test",
  "am-container-spec":
{
  "local-resources":
  {
    "entry":
    [
      {
        "key":"AppMaster.jar",
        "value":
        {
          "resource":"hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
          "type":"FILE",
          "visibility":"APPLICATION",
          "size": 43004,
          "timestamp": 1405452071209
        }
      }
    ]
  },
  "commands":
  {
    "command":"{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
  },
  "environment":
  {
    "entry":
    [
      {
        "key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
        "value": "1405459400754"
      },
      {
        "key": "CLASSPATH",
        "value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
      },
      {
        "key": "DISTRIBUTEDSHELLSCRIPTLEN",
        "value": "6"
      },
      {
        "key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
        "value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
      }
    ]
  }
},
"unmanaged-AM":false,
"max-app-attempts":2,
"resource":
{
  "memory":1024,
  "vCores":1
},
"application-type":"YARN",
"keep-containers-across-application-attempts":false,
"log-aggregation-context":
{
  "log-include-pattern":"file1",
  "log-exclude-pattern":"file2",
  "rolled-log-include-pattern":"file3",
  "rolled-log-exclude-pattern":"file4",
  "log-aggregation-policy-class-name":"org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy",
  "log-aggregation-policy-parameters":""
},
"attempt-failures-validity-interval":3600000,
"reservation-id":"reservation_1454114874_1",
"am-black-listing-requests":
{
  "am-black-listing-enabled":true,
  "disable-failure-threshold":0.01
}
}

Я пытался перевести свои аргументы в этот JSONтело запроса POST, но это кажется невозможным.Кто-нибудь знает, могу ли я выполнить обратный инжиниринг из запущенного приложения, которое я отправил полезной нагрузке JSON для отправки через REST?Или какое отображение я мог бы использовать, чтобы взять аргументы клиента и поместить их в JSON?

1 Ответ

0 голосов
/ 24 декабря 2018

После небольшого поиска мне удалось подать заявку только из REST API.Это не хорошо документированный процесс, поэтому я публикую его здесь.

ПРИМЕЧАНИЕ: , если в любой момент вы хотите сравнить содержание запроса с запросом, отправленным клиентомиспользуйте точки отладки для проверки контекста приложения, используемого клиентом.Откройте класс org.apache.hadoop.yarn.client.api.impl.YarnClientImpl и перейдите к методу submitApplication(ApplicationSubmissionContext appContext).

Во-первых, чтобы заменить spark.deploy.yarn.Client запросом REST API, решение должно убедиться, что все файлы, указанные в конфигурации, доступны наHDFS.Позже он должен составить и загрузить один дополнительный файл с именем __spark_conf__.zip.

Шаг 1

Просмотрите файлы из SparkConf (второй аргумент Client): файлы, упомянутые в " AllJars"тег, файл, указанный в" mainJarPath", и файлы, упомянутые в" FilesList".

Для каждого файла проверьте, существует ли он в HDFS, а если нет - загрузите его с локального компьютера.Для каждого файла получите его FileStatus из HDFS.агрегировать список ресурсов, который представляет собой карту атрибутов для каждого файла, содержащего эти 6 атрибутов:

  • size = getLen ()
  • timestamp = getModificationTime ()
  • тип= FILE
  • visibility = PUBLIC

Два других атрибута: ключ и ресурс.

  • Файлы из списка allJars: ключ spark_libs / {{имя_файла}}, ресурсом является имя файла.
  • Файлы из ключа FilesList: keyявляется тегом «localEntry», ресурс - тегом «hdfsPath».
  • Файл в mainJarPath: ключ - « app .jar», ресурс - это имя файла.

Шаг 2

Создание файла __spark_conf__.zip.Вы можете создать его непосредственно в hdf, в промежуточном пути, который обычно {{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip.Этот архивный файл содержит два файла и один пустой каталог: один файл __spark_hadoop_conf__.xml (переименованный в core-site.xml), а другой файл называется __spark_conf__.properties, который является слегка измененной версией раздела sparkConf из конфигурации.

Чтобы создать __spark_conf__.properties, вам нужно прочитать карту JSON из " sparkConf" -> "org $ apache $ spark $ SparkConf $$ settings " и преобразоватькаждая строка из формата JSON "spark.safemine.addcontrol.driverMemory": "5120M" до spark.safemine.addcontrol.driverMemory = 5120M

ВнизВ файле добавьте 6 новых строк:

  • spark.yarn.cache.confArchive = {{место, куда вы будете загружать __spark_conf__.zip в sparkStaging}}
  • spark.yarn.cache.visabilities = {{все видимости файлов, разделенные запятыми - в основном "PUBLIC, PUBLIC, ..., PUBLIC"}}
  • spark.yarn.cache.timestamps = {{Всевременные метки для файлов, разделенные запятыми}}
  • spark.yarn.cache.types = {{все типы файловs, разделенные запятой - в основном "FILE, FILE, ..., FILE"}}
  • spark.yarn.cache.filenames = {{Все имена файлов и ключи, записанные как ресурс # ключ и разделенные запятой}}
  • spark.yarn.cache.sizes = {{Все размеры файлов, разделенные запятыми}}

Убедитесь, что вы скомпилировали 5 агрегированных строк в соответствующем порядке.Я использовал этот код:

    String confArchive = "spark.yarn.cache.confArchive="+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
    String filenames = "spark.yarn.cache.filenames=";
    String sizes = "spark.yarn.cache.sizes=";
    String timestamps = "spark.yarn.cache.timestamps=";
    String types = "spark.yarn.cache.types=";
    String visibilities = "spark.yarn.cache.visibilities=";
    for (Map<String,String> localResource:localResources) {
        filenames+=localResource.get("resource")+"#"+localResource.get("key")+",";
        sizes+=localResource.get("size")+",";
        timestamps+=localResource.get("timestamp")+",";
        types+=localResource.get("type")+",";
        visibilities+=localResource.get("visibility")+",";

    }
    properties+=confArchive+"\n";
    properties+=filenames.substring(0,filenames.length()-1)+"\n";
    properties+=sizes.substring(0,sizes.length()-1)+"\n";
    properties+=timestamps.substring(0,timestamps.length()-1)+"\n";
    properties+=types.substring(0,types.length()-1)+"\n";
    properties+=visibilities.substring(0,visibilities.length()-1)+"\n";

Файл __spark_hadoop_conf__.xml представляет собой простое переименование core-site.xml, и созданная с ним папка называется __hadoop_conf__ и остается пустой.

Вы можете сохранить файлы в hdf напрямую, вот так:

private void generateSparkConfInHdfs(String applicationId, String userName, String sparkConfProperties, String sparkHadoopConf) throws IOException {
    String path = hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
    Path hdfsPath = new Path(path);
    ZipOutputStream os = new ZipOutputStream(getHdfs().create(hdfsPath));
    os.putNextEntry(new ZipEntry("__hadoop_conf__/"));
    os.putNextEntry(new ZipEntry("__spark_conf__.properties"));
    os.write(sparkConfProperties.getBytes(),0,sparkConfProperties.getBytes().length);
    os.putNextEntry(new ZipEntry("__spark_hadoop_conf__.xml"));
    os.write(sparkHadoopConf.getBytes(),0,sparkHadoopConf.getBytes().length);
    os.close();
}

После завершения создания файла добавьте его в список ресурсов со следующими характеристиками:

  • size = getLen()
  • timestamp = getModificationTime ()
  • type = ARCHIVE
  • visibility = PRIVATE
  • key = __spark_conf__
  • ресурс являетсяпромежуточный каталог (обычно {{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip).

Просмотрите полный список ресурсов и создайте из них XML / JSON с этой структурой для каждого, используя значения, которые мы собрали в {{}} заполнители:

    <entry>
        <key>{{key}}</key>
        <value>
            <resource>{{resource}}</resource>
            <size>{{size}}</size>
            <timestamp>{{timestamp}}</timestamp>
            <type>{{type}}</type>
            <visibility>{{visibility}}</visibility>
        </value>
    </entry>

Накопленной строкой будет ваш localResources XML-сегмент, показанный ниже.

Шаг 3

Генерация команды Java.Вам нужно извлечь несколько элементов из SparkConfig:

  • driverMemory - из того же атрибута в sparkConf
  • extraJavaOptions = из spark.driver.extraJavaOptions в коллекции атрибутов
  • mainClass - из того же атрибута в sparkConf
  • argstr - собрать все ClientArgs, кроме --class 1.

Команда результата с командойэлементы включают в себя:

String command = "$JAVA_HOME/bin/java -server -Xmx"+driverMemory+" -Djava.io.tmpdir=$PWD/tmp "+extraJavaOptions+" -Dspark.yarn.app.container.log.dir=&lt;LOG_DIR&gt; "
            + "org.apache.spark.deploy.yarn.ApplicationMaster --class "+mainClass+" "+argstr+" "
            + "--properties-file $PWD/__spark_conf__/__spark_conf__.properties 1&gt; &lt;LOG_DIR&gt;/stdout 2&gt; &lt;LOG_DIR&gt;/stderr";

Шаг 4

Компиляция XML-запроса.

ПРИМЕЧАНИЕ : моя реализация требуетметка на контейнере AM, поэтому добавлено выражение am-container-node-label-expression.Это не будет применимо во всех случаях.

Отображение из sparkConf в запрос REST (показано здесь в XML, также поддерживается реализация JSON):

<application-submission-context>
    <application-id>"+applicationId+"</application-id> 
    <application-name>"+appName+"</application-name>
    <queue>default</queue>
    <priority>0</priority>
    <am-container-spec>
       <local-resources>+localResources+</local-resources>
       <environment>
          <entry>
             <key>SPARK_YARN_STAGING_DIR</key>
             <value>"+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"</value>
          </entry>
          <entry>
             <key>CLASSPATH</key>
             <value>$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:/spark-non-hdfs-storage/spark-assembly-2.3.0-hadoop2.7/*:%HADOOP_CONF_DIR%:%HADOOP_COMMON_HOME%/share/hadoop/common/*:%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__</value>
          </entry>
          <entry>
             <key>SPARK_USER</key>
             <value>"+userName+"</value>
          </entry>
       </environment>
       <commands>
          <command>"+command+"</command>
       </commands>
    </am-container-spec>
    <unmanaged-AM>false</unmanaged-AM>
    <max-app-attempts>1</max-app-attempts>
    <resource>
      <memory>5632</memory>
      <vCores>1</vCores>
    </resource>
    <application-type>SPARK</application-type>
    <keep-containers-across-application-attempts>false</keep-containers-across-application-attempts>
    <application-tags>
      <tag>"+sparkYarnTag+"</tag>
    </application-tags>
    <am-container-node-label-expression>appMngr</am-container-node-label-expression>
    <log-aggregation-context/>
    <attempt-failures-validity-interval>1</attempt-failures-validity-interval>
    <reservation-id/>
</application-submission-context> 

Шаг 5:

Подача заявки через REST http PUT:

private void submitApplication (String body, String userName) throws SMSparkManagerException {
    HttpClient client = HttpClientBuilder.create().build();
    HttpPost request = new HttpPost(uri+"?user.name="+userName);
    try {
        request.setEntity(new StringEntity(body, ContentType.APPLICATION_XML));
        HttpResponse response = client.execute(request);
        if (response.getStatusLine().getStatusCode()!=202) {
            throw new SMSparkManagerException("The application could not be submitted to Yarn, response http code "+response.getStatusLine().getStatusCode());
        }
    } catch (UnsupportedEncodingException e) {
        logger.error("The application Could not be submitted due to UnsupportedEncodingException in the provided body: "+body, e );
        throw new SMSparkManagerException("Error in submitting application to yarn");
    } catch (ClientProtocolException e) {
        logger.error("The application Could not be submitted due to ClientProtocolException", e);
        throw new SMSparkManagerException("Error in submitting application to yarn");
    } catch (IOException e) {
        logger.error("The application Could not be submitted due to IOException", e);
        throw new SMSparkManagerException("Error in submitting application to yarn");
    }
}
...