Оптимизация скрипта Spark на локальный - PullRequest
0 голосов
/ 05 июля 2018

Я потянул свои волосы, пытаясь оптимизировать скрипт Spark, и он все еще невыносимо медленный (24 минуты для 600 МБ данных). Полный код здесь , но я постараюсь подвести итог в этом вопросе; пожалуйста, дайте мне знать, если вы видите какие-либо способы ускорить его.

Аппаратное обеспечение : один компьютер с 256 ГБ памяти и 32-ядерным процессором => необходимо использовать локальный как главный. Для моего проекта нужны local и local[*], но давайте сосредоточимся на local

Данные : 2 файла NetCDF (столбчатые данные); одна машина => нет HDFS

Анализ данных : читать все столбцы как Arrays -> ss.parallelize + zip -> преобразовать в DataFrame

Действия : show(), summary(min, max, mean, stddev), write, groupBy(), write

Как я запускаю : sbt assembly, чтобы создать толстую банку, исключая только саму искру +

spark-submit --master "local" --conf "spark.sql.shuffle.partitions=4" --driver-memory "10g" target/scala-2.11/spark-assembly-1.0.jar --partitions 4 --input ${input} --slice ${slice}

Оптимизации, которые я пробовал :

  • распараллелить RDD с тем же числом разделов, а также установить разделы DataFrame по умолчанию на то же число, чтобы минимизировать перемещение данных => казалось, помогло
  • разные номера разделов => 1, кажется, просто зависает, более 4, кажется, замедляют его (подчиняясь правилам numPartitions = ~ 4x число ядер и numPartitions = ~ data / 128MB)
  • читать все данные в драйвере как Scala Arrays -> transpose -> single RDD (в отличие от архивирования RDD) => медленнее
  • перераспределение только что прочитанных DataFrames в тех же столбцах и numPartitions, чтобы объединение не вызывало случайное перемешивание
  • кэширование фреймов данных, которые используются повторно

Код (несколько переименований и комментариев удалено):

private def readDataRDD(path: String, ss: SparkSession, dims: List[String], createIndex: Boolean, numPartitions: Int): DataFrame = {
  val file: NetcdfFile = NetcdfFile.open(path)
  val vars: util.List[Variable] = file.getVariables
  // split variables into dimensions and regular data
  val dimVars: Map[String, Variable] = vars.filter(v => dims.contains(v.getShortName)).map(v => v.getShortName -> v).toMap
  val colVars: Map[String, Variable] = vars.filter(v => !dims.contains(v.getShortName)).map(v => v.getShortName -> v).toMap

  val lon: Array[Float] = readVariable(dimVars(dims(0)))
  val lat: Array[Float] = readVariable(dimVars(dims(1)))
  val tim: Array[Float] = readVariable(dimVars(dims(2)))
  val dimsCartesian: Array[ListBuffer[_]] = cartesian(lon, lat, tim)

  // create the rdd with the dimensions (by transposing the cartesian product)
  var tempRDD: RDD[ListBuffer[_]] = ss.sparkContext.parallelize(dimsCartesian, numPartitions)
  // gather the names of the columns (in order)
  val names: ListBuffer[String] = ListBuffer(dims: _*)

  for (col <- colVars) {
    tempRDD = tempRDD.zip(ss.sparkContext.parallelize(readVariable(col._2), numPartitions)).map(t => t._1 :+ t._2)
    names.add(col._1)
  }

  if (createIndex) {
    tempRDD = tempRDD.zipWithIndex().map(t => t._1 :+ t._2.asInstanceOf[Float])
    names.add("index")
  }

  val finalRDD: RDD[Row] = tempRDD.map(Row.fromSeq(_))
  val df: DataFrame = ss.createDataFrame(finalRDD, StructType(names.map(StructField(_, FloatType, nullable = false))))

  val floatTimeToString = udf((time: Float) => {
    val udunits = String.valueOf(time.asInstanceOf[Int]) + " " + UNITS

    CalendarDate.parseUdunits(CALENDAR, udunits).toString.substring(0, 10)
  })

  df.withColumn("time", floatTimeToString(df("time")))
}

def main(args: Array[String]): Unit = {
  val spark: SparkSession = SparkSession.builder
    .appName("Spark Pipeline")
    .getOrCreate()

  val dimensions: List[String] = List("longitude", "latitude", "time")
  val numberPartitions = options('partitions).asInstanceOf[Int]
  val df1: DataFrame = readDataRDD(options('input) + "data1.nc", spark, dimensions, createIndex = true, numberPartitions)
    .repartition(numberPartitions, col("longitude"), col("latitude"), col("time"))
  val df2: DataFrame = readDataRDD(options('input) + "data2.nc", spark, dimensions, createIndex = false, numberPartitions)
    .repartition(numberPartitions, col("longitude"), col("latitude"), col("time"))

  var df: DataFrame = df1.join(df2, dimensions, "inner").cache()

  println(df.show())

  val slice: Array[String] = options('slice).asInstanceOf[String].split(":")
  df = df.filter(df("index") >= slice(0).toFloat && df("index") < slice(1).toFloat)
    .filter(df("tg") =!= -99.99f && df("pp") =!= -999.9f && df("rr") =!= -999.9f)
    .drop("pp_stderr", "rr_stderr", "index")
    .withColumn("abs_diff", abs(df("tx") - df("tn"))).cache()

  val df_agg = df.drop("longitude", "latitude", "time")
    .summary("min", "max", "mean", "stddev")
    .coalesce(1)
    .write
    .option("header", "true")
    .csv(options('output) + "agg")

  val computeYearMonth = udf((time: String) => {
    time.substring(0, 7).replace("-", "")
  })
  df = df.withColumn("year_month", computeYearMonth(df("time")))

  val columnsToAgg: Array[String] = Array("tg", "tn", "tx", "pp", "rr")
  val groupOn: Seq[String] = Seq("longitude", "latitude", "year_month")
  val grouped_df: DataFrame = df.groupBy(groupOn.head, groupOn.drop(1): _*)
    .agg(columnsToAgg.map(column => column -> "mean").toMap)
    .drop("longitude", "latitude", "year_month")

  val columnsToSum: Array[String] = Array("tg_mean", "tn_mean", "tx_mean", "rr_mean", "pp_mean")
  grouped_df
    .agg(columnsToSum.map(column => column -> "sum").toMap)
    .coalesce(1)
    .write
    .option("header", "true")
    .csv(options('output) + "grouped")

  spark.stop()
}

Есть идеи, как ускорить процесс?

Примечания :

  • local занимает 24 минуты; local[32] занимает 5 минут
  • да, Spark не создан для 1 машины, но одни и те же операции (однопоточные) в java или pandas занимают 10 и 40 секунд соответственно; огромная разница
  • в настоящее время не может просматривать веб-интерфейс для визуализации задач
  • данные 600 МБ являются подмножеством; полный набор данных ~ 50 ГБ
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...