После небольшого поиска мне удалось подать заявку только из REST API.Это не хорошо документированный процесс, поэтому я публикую его здесь.
ПРИМЕЧАНИЕ: , если в любой момент вы хотите сравнить содержание запроса с запросом, отправленным клиентомиспользуйте точки отладки для проверки контекста приложения, используемого клиентом.Откройте класс org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
и перейдите к методу submitApplication(ApplicationSubmissionContext appContext)
.
Во-первых, чтобы заменить spark.deploy.yarn.Client
запросом REST API, решение должно убедиться, что все файлы, указанные в конфигурации, доступны наHDFS.Позже он должен составить и загрузить один дополнительный файл с именем __spark_conf__.zip
.
Шаг 1
Просмотрите файлы из SparkConf
(второй аргумент Client
): файлы, упомянутые в " AllJars"тег, файл, указанный в" mainJarPath", и файлы, упомянутые в" FilesList".
Для каждого файла проверьте, существует ли он в HDFS, а если нет - загрузите его с локального компьютера.Для каждого файла получите его FileStatus
из HDFS.агрегировать список ресурсов, который представляет собой карту атрибутов для каждого файла, содержащего эти 6 атрибутов:
- size = getLen ()
- timestamp = getModificationTime ()
- тип= FILE
- visibility = PUBLIC
Два других атрибута: ключ и ресурс.
- Файлы из списка allJars: ключ spark_libs / {{имя_файла}}, ресурсом является имя файла.
- Файлы из ключа FilesList: keyявляется тегом «localEntry», ресурс - тегом «hdfsPath».
- Файл в mainJarPath: ключ - « app .jar», ресурс - это имя файла.
Шаг 2
Создание файла __spark_conf__.zip
.Вы можете создать его непосредственно в hdf, в промежуточном пути, который обычно {{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip
.Этот архивный файл содержит два файла и один пустой каталог: один файл __spark_hadoop_conf__.xml
(переименованный в core-site.xml
), а другой файл называется __spark_conf__.properties
, который является слегка измененной версией раздела sparkConf из конфигурации.
Чтобы создать __spark_conf__.properties
, вам нужно прочитать карту JSON из " sparkConf" -> "org $ apache $ spark $ SparkConf $$ settings " и преобразоватькаждая строка из формата JSON "spark.safemine.addcontrol.driverMemory": "5120M" до spark.safemine.addcontrol.driverMemory = 5120M
ВнизВ файле добавьте 6 новых строк:
- spark.yarn.cache.confArchive = {{место, куда вы будете загружать
__spark_conf__.zip
в sparkStaging}} - spark.yarn.cache.visabilities = {{все видимости файлов, разделенные запятыми - в основном "PUBLIC, PUBLIC, ..., PUBLIC"}}
- spark.yarn.cache.timestamps = {{Всевременные метки для файлов, разделенные запятыми}}
- spark.yarn.cache.types = {{все типы файловs, разделенные запятой - в основном "FILE, FILE, ..., FILE"}}
- spark.yarn.cache.filenames = {{Все имена файлов и ключи, записанные как ресурс # ключ и разделенные запятой}}
- spark.yarn.cache.sizes = {{Все размеры файлов, разделенные запятыми}}
Убедитесь, что вы скомпилировали 5 агрегированных строк в соответствующем порядке.Я использовал этот код:
String confArchive = "spark.yarn.cache.confArchive="+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
String filenames = "spark.yarn.cache.filenames=";
String sizes = "spark.yarn.cache.sizes=";
String timestamps = "spark.yarn.cache.timestamps=";
String types = "spark.yarn.cache.types=";
String visibilities = "spark.yarn.cache.visibilities=";
for (Map<String,String> localResource:localResources) {
filenames+=localResource.get("resource")+"#"+localResource.get("key")+",";
sizes+=localResource.get("size")+",";
timestamps+=localResource.get("timestamp")+",";
types+=localResource.get("type")+",";
visibilities+=localResource.get("visibility")+",";
}
properties+=confArchive+"\n";
properties+=filenames.substring(0,filenames.length()-1)+"\n";
properties+=sizes.substring(0,sizes.length()-1)+"\n";
properties+=timestamps.substring(0,timestamps.length()-1)+"\n";
properties+=types.substring(0,types.length()-1)+"\n";
properties+=visibilities.substring(0,visibilities.length()-1)+"\n";
Файл __spark_hadoop_conf__.xml
представляет собой простое переименование core-site.xml
, и созданная с ним папка называется __hadoop_conf__
и остается пустой.
Вы можете сохранить файлы в hdf напрямую, вот так:
private void generateSparkConfInHdfs(String applicationId, String userName, String sparkConfProperties, String sparkHadoopConf) throws IOException {
String path = hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
Path hdfsPath = new Path(path);
ZipOutputStream os = new ZipOutputStream(getHdfs().create(hdfsPath));
os.putNextEntry(new ZipEntry("__hadoop_conf__/"));
os.putNextEntry(new ZipEntry("__spark_conf__.properties"));
os.write(sparkConfProperties.getBytes(),0,sparkConfProperties.getBytes().length);
os.putNextEntry(new ZipEntry("__spark_hadoop_conf__.xml"));
os.write(sparkHadoopConf.getBytes(),0,sparkHadoopConf.getBytes().length);
os.close();
}
После завершения создания файла добавьте его в список ресурсов со следующими характеристиками:
- size = getLen()
- timestamp = getModificationTime ()
- type = ARCHIVE
- visibility = PRIVATE
- key =
__spark_conf__
- ресурс являетсяпромежуточный каталог (обычно
{{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip
).
Просмотрите полный список ресурсов и создайте из них XML / JSON с этой структурой для каждого, используя значения, которые мы собрали в {{}} заполнители:
<entry>
<key>{{key}}</key>
<value>
<resource>{{resource}}</resource>
<size>{{size}}</size>
<timestamp>{{timestamp}}</timestamp>
<type>{{type}}</type>
<visibility>{{visibility}}</visibility>
</value>
</entry>
Накопленной строкой будет ваш localResources
XML-сегмент, показанный ниже.
Шаг 3
Генерация команды Java.Вам нужно извлечь несколько элементов из SparkConfig:
- driverMemory - из того же атрибута в
sparkConf
- extraJavaOptions = из
spark.driver.extraJavaOptions
в коллекции атрибутов - mainClass - из того же атрибута в
sparkConf
- argstr - собрать все
ClientArgs
, кроме --class 1.
Команда результата с командойэлементы включают в себя:
String command = "$JAVA_HOME/bin/java -server -Xmx"+driverMemory+" -Djava.io.tmpdir=$PWD/tmp "+extraJavaOptions+" -Dspark.yarn.app.container.log.dir=<LOG_DIR> "
+ "org.apache.spark.deploy.yarn.ApplicationMaster --class "+mainClass+" "+argstr+" "
+ "--properties-file $PWD/__spark_conf__/__spark_conf__.properties 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr";
Шаг 4
Компиляция XML-запроса.
ПРИМЕЧАНИЕ : моя реализация требуетметка на контейнере AM, поэтому добавлено выражение am-container-node-label-expression.Это не будет применимо во всех случаях.
Отображение из sparkConf в запрос REST (показано здесь в XML, также поддерживается реализация JSON):
<application-submission-context>
<application-id>"+applicationId+"</application-id>
<application-name>"+appName+"</application-name>
<queue>default</queue>
<priority>0</priority>
<am-container-spec>
<local-resources>+localResources+</local-resources>
<environment>
<entry>
<key>SPARK_YARN_STAGING_DIR</key>
<value>"+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"</value>
</entry>
<entry>
<key>CLASSPATH</key>
<value>$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:/spark-non-hdfs-storage/spark-assembly-2.3.0-hadoop2.7/*:%HADOOP_CONF_DIR%:%HADOOP_COMMON_HOME%/share/hadoop/common/*:%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__</value>
</entry>
<entry>
<key>SPARK_USER</key>
<value>"+userName+"</value>
</entry>
</environment>
<commands>
<command>"+command+"</command>
</commands>
</am-container-spec>
<unmanaged-AM>false</unmanaged-AM>
<max-app-attempts>1</max-app-attempts>
<resource>
<memory>5632</memory>
<vCores>1</vCores>
</resource>
<application-type>SPARK</application-type>
<keep-containers-across-application-attempts>false</keep-containers-across-application-attempts>
<application-tags>
<tag>"+sparkYarnTag+"</tag>
</application-tags>
<am-container-node-label-expression>appMngr</am-container-node-label-expression>
<log-aggregation-context/>
<attempt-failures-validity-interval>1</attempt-failures-validity-interval>
<reservation-id/>
</application-submission-context>
Шаг 5:
Подача заявки через REST http PUT:
private void submitApplication (String body, String userName) throws SMSparkManagerException {
HttpClient client = HttpClientBuilder.create().build();
HttpPost request = new HttpPost(uri+"?user.name="+userName);
try {
request.setEntity(new StringEntity(body, ContentType.APPLICATION_XML));
HttpResponse response = client.execute(request);
if (response.getStatusLine().getStatusCode()!=202) {
throw new SMSparkManagerException("The application could not be submitted to Yarn, response http code "+response.getStatusLine().getStatusCode());
}
} catch (UnsupportedEncodingException e) {
logger.error("The application Could not be submitted due to UnsupportedEncodingException in the provided body: "+body, e );
throw new SMSparkManagerException("Error in submitting application to yarn");
} catch (ClientProtocolException e) {
logger.error("The application Could not be submitted due to ClientProtocolException", e);
throw new SMSparkManagerException("Error in submitting application to yarn");
} catch (IOException e) {
logger.error("The application Could not be submitted due to IOException", e);
throw new SMSparkManagerException("Error in submitting application to yarn");
}
}