Как сбалансировать данные Spark DataFrame перед выводом partitionBy - PullRequest
1 голос
/ 21 апреля 2020

У меня есть DataFrame с геопространственными данными. Я хочу разделить его по столбцам: источник данных id и quadkeys.

Основная цель - иметь на одной руке минимальное количество файлов не больше определенного размера.

С другой стороны, я хочу оптимизировать разделы в памяти (если это будет слишком много, я могу столкнуться с исключением из памяти).

Другими словами, у меня есть qk = 0,1,2,3 и следующее число записей для каждого quadkey: 0 (1000), 1 (1000), 2 (10000), 3(100000). At output files I desire to have next files (not more than 5000 records on each): 0 (1000) 1 (1000) 2_1 (5000) 2_2 (5000) 3_1 (5000) ... 3_10 (5000)

В качестве решения я строю поиск распределения и соли. Мне интересно, что было бы лучшим решением для этого.

val MAX_PARTITION_ROWS = 3000000

val pid = "sourceid"
val qk = "qk"
val distributionDf: Array[((Int, String), Int)] = stage.
  select(col(pid), col(qk), lit(1L) as "cnt").
  groupBy(pid,qk).
  agg(sum("cnt") as "sum").
  rdd.map(r=>((r.getInt(0), r.getString(1))->r.getLong(2).toInt)).collect
LOG.info(s"QK distribution\nQK |PID |Count |Partitions number")
LOG.info(s"${distributionDf.sortBy(-_._2).map(x=>s"${x._1._2} |${x._1._1} | ${x._2} | ${x._2/MAX_PARTITION_ROWS}").mkString("\n")}")

val r = scala.util.Random
val distributionMap = distributionDf map { case (k, v) => k -> (v/MAX_PARTITION_ROWS+1) }toMap
val saltUdf = udf((pid: Int, qk: String) => {
  val dev = distributionMap.getOrElse((pid, qk), 1)
  r.nextInt(dev)
})

val partitionsNumber = if (distributionMap.isEmpty) 1 else distributionMap.values.sum

stage.withColumn("salt", saltUdf(col(pid),col(qk)))
  .repartition(partitionsNumber, col(pid), col(qk), col("salt"))
  .drop("salt")
  .write.partitionBy(pid, qk)
  .format("parquet")
  .option("maxRecordsPerFile", (MAX_PARTITION_ROWS*1.2).toInt)
  .option("compression", "gzip")
  .save(destUrl)

1 Ответ

0 голосов
/ 21 апреля 2020

Вы можете использовать maxRecordsPerFile как задокументировано здесь

Пример реализации

val df = spark.range(100).coalesce(1)
df.write.option("maxRecordsPerFile", 50).save(filePath)
...