Spark медленно перераспределяет много маленьких файлов - PullRequest
0 голосов
/ 02 октября 2018

Я пытаюсь прочитать папку, состоящую из множества небольших паркетных файлов: 600 файлов по 500 КБ каждый.И затем repartition их в 2 файла.

val df = spark.read.parquet("folder")
df.repartition(2).write.mode("overwrite").parquet("output_folder")

И это ужасно медленно, до 10 минут.Из интерфейса spark я вижу, что 2 исполнителя выполняют 2 задачи.Я даю каждому исполнителю 10 ГБ памяти.

enter image description here

Так в чем причина медленной скорости?Это из-за дискового ввода-вывода?И как я могу улучшить производительность в этом случае.

Редактировать : Я также пытался использовать coalesce, и производительность не выглядит иначе.

1 Ответ

0 голосов
/ 03 октября 2018

Первый вариант состоит в том, чтобы сделать большой файл из небольших файлов паркета на исходном уровне, это объединить их в несколько файлов размером> 128 МБ или любого размера, который вы хотели

как объединить несколько файлов паркета в один файл паркета с помощью команды linux или hdfs?

Второй вариант т. Е. С использованием spark: читать небольшие файлы паркета, а затем до фактической обработки данныхобрабатывать логику с помощью spark и записывать их в файлы относительно больших размеров, как вы ожидаете (принимая во внимание факторы производительности)

Второй вариант :

Даже если вы не знаете, какова ваша конфигурация заданий на искру ... Но в целом coalesce должен работать .... попробуйте, как показано ниже (master -> local, но измените его на пряжу для вашего приложения), который работал для меня.здесь, в этом примере, я взял маленькие файлы "./userdata*.parquet" (5 маленьких файлов размером около 110 КБ) в каталоге src / main / resources и слил их в финальные 2 файла с coalesce ...

Подход: Считайте каждый файл паркета как кадр данных, а затем объедините, чтобы создать один кадр данных, а затем coalesce it.

  package com.examples

import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import scala.collection.mutable

/** *
  * take small pegs and make a large peg
  * and coalesce it
  *
  * @author : Ram Ghadiyaram
  */
object ParquetPlay extends Logging {
  Logger.getLogger("org").setLevel(Level.OFF)


  //public FileStatus[] globStatus(Path pathPattern) throws IOException
  def main(args: Array[String]): Unit = {


 val appName = if (args.length >0) args(0) else this.getClass.getName
    val spark: SparkSession = SparkSession.builder
      .config("spark.master", "local")
      .appName(appName)
      .getOrCreate()
    val fs = FileSystem.get(new Configuration())

    val files = fs.globStatus(new Path("./userdata*.parquet")).map(_.getPath.toString)
    val dfSeq = mutable.MutableList[DataFrame]()
    println(dfSeq)
    println(files.length)
    files.foreach(x => println(x))
    val newDFs = files.map(dir => {
      dfSeq += spark.read.parquet(dir).toDF()
    })
    println(dfSeq.length)
    val finalDF = dfSeq.reduce(_ union _)
      .toDF
    finalDF.show(false)
    println(System.getProperty("java.io.tmpdir"))
    println(System.getProperties.toString)
    finalDF.coalesce(2)
      .write
      .mode(SaveMode.Overwrite)
      .parquet(s"${System.getProperty("java.io.tmpdir")}/final.parquet")
    println("done")
  }
}

Результат: почти равный размер2 файла, как показано ниже ... здесь в примере снова генерируются небольшие файлы, но в вашем случае, поскольку у вас есть размер 500 КБ и около 600 файлов, вы можете увидеть размер файла, и вы можете решить coalesce (количество разделов вашегоожидание)

enter image description here

Третий вариант: Как Мин (оригинальный постер), упомянутый в комментарии ... Это могут быть большие файлы с высокой степенью сжатия, которые становятся маленькими после сжатия, может быть причиной этого.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...