Первый вариант состоит в том, чтобы сделать большой файл из небольших файлов паркета на исходном уровне, это объединить их в несколько файлов размером> 128 МБ или любого размера, который вы хотели
как объединить несколько файлов паркета в один файл паркета с помощью команды linux или hdfs?
Второй вариант т. Е. С использованием spark: читать небольшие файлы паркета, а затем до фактической обработки данныхобрабатывать логику с помощью spark и записывать их в файлы относительно больших размеров, как вы ожидаете (принимая во внимание факторы производительности)
Второй вариант :
Даже если вы не знаете, какова ваша конфигурация заданий на искру ... Но в целом coalesce
должен работать .... попробуйте, как показано ниже (master -> local, но измените его на пряжу для вашего приложения), который работал для меня.здесь, в этом примере, я взял маленькие файлы "./userdata*.parquet" (5 маленьких файлов размером около 110 КБ) в каталоге src / main / resources и слил их в финальные 2 файла с coalesce
...
Подход: Считайте каждый файл паркета как кадр данных, а затем объедините, чтобы создать один кадр данных, а затем coalesce
it.
package com.examples
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scala.collection.mutable
/** *
* take small pegs and make a large peg
* and coalesce it
*
* @author : Ram Ghadiyaram
*/
object ParquetPlay extends Logging {
Logger.getLogger("org").setLevel(Level.OFF)
//public FileStatus[] globStatus(Path pathPattern) throws IOException
def main(args: Array[String]): Unit = {
val appName = if (args.length >0) args(0) else this.getClass.getName
val spark: SparkSession = SparkSession.builder
.config("spark.master", "local")
.appName(appName)
.getOrCreate()
val fs = FileSystem.get(new Configuration())
val files = fs.globStatus(new Path("./userdata*.parquet")).map(_.getPath.toString)
val dfSeq = mutable.MutableList[DataFrame]()
println(dfSeq)
println(files.length)
files.foreach(x => println(x))
val newDFs = files.map(dir => {
dfSeq += spark.read.parquet(dir).toDF()
})
println(dfSeq.length)
val finalDF = dfSeq.reduce(_ union _)
.toDF
finalDF.show(false)
println(System.getProperty("java.io.tmpdir"))
println(System.getProperties.toString)
finalDF.coalesce(2)
.write
.mode(SaveMode.Overwrite)
.parquet(s"${System.getProperty("java.io.tmpdir")}/final.parquet")
println("done")
}
}
Результат: почти равный размер2 файла, как показано ниже ... здесь в примере снова генерируются небольшие файлы, но в вашем случае, поскольку у вас есть размер 500 КБ и около 600 файлов, вы можете увидеть размер файла, и вы можете решить coalesce
(количество разделов вашегоожидание)
Третий вариант: Как Мин (оригинальный постер), упомянутый в комментарии ... Это могут быть большие файлы с высокой степенью сжатия, которые становятся маленькими после сжатия, может быть причиной этого.