Я не очень разбираюсь в pyspark, но в scala решение будет включать что-то вроде этого
Сначала нам нужно создать метод для создания файла заголовка. так было бы что-то вроде этого
def createHeaderFile(headerFilePath: String, colNames: Array[String]) {
//format header file path
val fileName = "dfheader.csv"
val headerFileFullName = "%s/%s".format(headerFilePath, fileName)
//write file to hdfs one line after another
val hadoopConfig = new Configuration()
val fileSystem = FileSystem.get(hadoopConfig)
val output = fileSystem.create(new Path(headerFileFullName))
val writer = new PrintWriter(output)
for (h <- colNames) {
writer.write(h + ",")
}
writer.write("\n")
writer.close()
}
вам также понадобится метод для вызова hadoop для объединения ваших файлов деталей, который будет записан методом df.write, так что это будет что-то вроде этого
def mergeOutputFiles(sourcePaths: String, destLocation: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
// in case of array[String] use for loop to iterate over the muliple source paths if not use the code below
// for (sourcePath <- sourcePaths) {
//Get the path under destination where the partitioned files are temporarily stored
val pathText = sourcePaths.split("/")
val destPath = "%s/%s".format(destLocation, pathText.last)
//Merge files into 1
FileUtil.copyMerge(hdfs, new Path(sourcePath), hdfs, new Path(destPath), true, hadoopConfig, null)
// }
//delete the temp partitioned files post merge complete
val tempfilesPath = "%s%s".format(destLocation, tempOutputFolder)
hdfs.delete(new Path(tempfilesPath), true)
}
вот метод генерации выходных файлов или метод df.write, при котором вы передаете свой огромный DF для записи в hadoop HDFS
def generateOutputFiles( processedDf: DataFrame, opPath: String, tempOutputFolder: String,
spark: SparkSession): String = {
import spark.implicits._
val fileName = "%s%sNameofyourCsvFile.csv".format(opPath, tempOutputFolder)
//write as csv to output directory and add file path to array to be sent for merging and create header file
processedDf.write.mode("overwrite").csv(fileName)
createHeaderFile(fileName, processedDf.columns)
//create an array of the partitioned file paths
outputFilePathList = fileName
// you can use array of string or string only depending on if the output needs to get divided in multiple file based on some parameter in that case chagne the signature ot Array[String] as output
// add below code
// outputFilePathList(counter) = fileName
// just use a loop in the above and increment it
//counter += 1
return outputFilePathList
}
со всеми методами, определенными здесь, как вы можете их реализовать
def processyourlogic( your parameters if any):Dataframe=
{
.... your logic to do whatever needs to be done to your data
}
при условии, что вышеупомянутый метод возвращает фрейм данных, вот как вы можете собрать все вместе
val yourbigDf= processyourlogic(your parameters) // returns DF
yourbigDf.cache // caching just in case you need it
val outputPathFinal=" location where you want your file to be saved"
val tempOutputFolderLocation = "temp/"
val partFiles= generateOutputFiles( yourbigDf, outputPathFinal,tempOutputFolderLocation ,spark)
mergeOutputFiles(partFiles, outputPathFinal)
Дайте мне знать, если у вас есть другие вопросы, связанные с этим. в идеале это должен быть отдельный вопрос. В дальнейшем, пожалуйста, откройте новый вопрос, если ответ, который вы ищете, отличается от первоначального вопроса.