Неверный код состояния «400» из .. ошибка полезной нагрузки: «требование не выполнено: сеанс не активен - PullRequest
0 голосов
/ 20 июня 2019

Я запускаю сценарии Pyspark для записи фрейма данных в csv в блокноте jupyter, как показано ниже:

df.coalesce(1).write.csv('Data1.csv',header = 'true')

Через час времени выполнения я получаю сообщение об ошибке ниже.

Error: Invalid status code from http://.....session isn't active.

Мой конфиг такой:

spark.conf.set("spark.dynamicAllocation.enabled","true")
spark.conf.set("shuffle.service.enabled","true")
spark.conf.set("spark.dynamicAllocation.minExecutors",6)
spark.conf.set("spark.executor.heartbeatInterval","3600s")
spark.conf.set("spark.cores.max", "4")
spark.conf.set("spark.sql.tungsten.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.app.id", "Logs")
spark.conf.set("spark.io.compression.codec", "snappy")
spark.conf.set("spark.rdd.compress", "true")
spark.conf.set("spark.executor.instances", "6")
spark.conf.set("spark.executor.memory", '20g')
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.driver.allowMultipleContexts", "true")
spark.conf.set("spark.master", "yarn")
spark.conf.set("spark.driver.memory", "20G")
spark.conf.set("spark.executor.instances", "32")
spark.conf.set("spark.executor.memory", "32G")
spark.conf.set("spark.driver.maxResultSize", "40G")
spark.conf.set("spark.executor.cores", "5")

Я проверил узлы контейнера, и там есть ошибка:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed:container_e836_1556653519610_3661867_01_000005 on host: ylpd1205.kmdc.att.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143

Не удалось выяснить проблему.

1 Ответ

0 голосов
/ 21 июня 2019

Я не очень разбираюсь в pyspark, но в scala решение будет включать что-то вроде этого

Сначала нам нужно создать метод для создания файла заголовка. так было бы что-то вроде этого

def createHeaderFile(headerFilePath: String, colNames: Array[String]) {

//format header file path
val fileName = "dfheader.csv"
val headerFileFullName = "%s/%s".format(headerFilePath, fileName)

//write file to hdfs one line after another
val hadoopConfig = new Configuration()
val fileSystem = FileSystem.get(hadoopConfig)
val output = fileSystem.create(new Path(headerFileFullName))
val writer = new PrintWriter(output)

for (h <- colNames) {
  writer.write(h + ",")
}
writer.write("\n")
writer.close()

}

вам также понадобится метод для вызова hadoop для объединения ваших файлов деталей, который будет записан методом df.write, так что это будет что-то вроде этого

  def mergeOutputFiles(sourcePaths: String, destLocation: String): Unit = {

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
// in case of array[String] use   for loop to iterate over the muliple source paths  if not use the code below 
//   for (sourcePath <- sourcePaths) {
  //Get the path under destination where the partitioned files are temporarily stored
  val pathText = sourcePaths.split("/")
  val destPath = "%s/%s".format(destLocation, pathText.last)

  //Merge files into 1
  FileUtil.copyMerge(hdfs, new Path(sourcePath), hdfs, new Path(destPath), true, hadoopConfig, null)
 // }
//delete the temp partitioned files post merge complete
val tempfilesPath = "%s%s".format(destLocation, tempOutputFolder)
hdfs.delete(new Path(tempfilesPath), true)

}

вот метод генерации выходных файлов или метод df.write, при котором вы передаете свой огромный DF для записи в hadoop HDFS

  def generateOutputFiles( processedDf: DataFrame, opPath: String, tempOutputFolder: String,
                       spark: SparkSession): String = {

import spark.implicits._

  val fileName = "%s%sNameofyourCsvFile.csv".format(opPath, tempOutputFolder)
  //write as csv to output directory and add file path to array to be sent for merging and create header file
  processedDf.write.mode("overwrite").csv(fileName)

  createHeaderFile(fileName, processedDf.columns)
  //create an array of the partitioned file paths

   outputFilePathList = fileName

  // you can use array of string or string only depending on  if the output needs to get divided in multiple file based on some parameter  in that case chagne the signature ot Array[String] as output
  // add below code 
 // outputFilePathList(counter) = fileName
  // just use a loop in the above  and increment it 
  //counter += 1


return outputFilePathList

}

со всеми методами, определенными здесь, как вы можете их реализовать

def processyourlogic( your parameters  if any):Dataframe=
{
.... your logic to do whatever needs to be done to your data
}

при условии, что вышеупомянутый метод возвращает фрейм данных, вот как вы можете собрать все вместе

val yourbigDf= processyourlogic(your parameters) // returns DF
yourbigDf.cache // caching just in case you need it 
val outputPathFinal=" location where you want your file to be saved"
 val tempOutputFolderLocation = "temp/"
val partFiles= generateOutputFiles( yourbigDf, outputPathFinal,tempOutputFolderLocation ,spark)
mergeOutputFiles(partFiles, outputPathFinal)

Дайте мне знать, если у вас есть другие вопросы, связанные с этим. в идеале это должен быть отдельный вопрос. В дальнейшем, пожалуйста, откройте новый вопрос, если ответ, который вы ищете, отличается от первоначального вопроса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...