Как проверить количество разделов в Spark DataFrame, не неся стоимости .rdd - PullRequest
0 голосов
/ 19 января 2019

Существует ряд вопросов о том, как получить количество разделов для RDD и / или DataFrame: ответы всегда следующие:

 rdd.getNumPartitions

или

 df.rdd.getNumPartitions

К сожалению, это дорогая операция на DataFrame, потому что

 df.rdd

требует преобразования из DataFrame в rdd.Это порядка времени, необходимого для запуска

 df.count

Я пишу логику, которая опционально repartition или coalesce 'sa DataFrame - на основебыло ли число разделов current в диапазоне допустимых значений или вместо этого или ниже их.

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
      } else inDf
    }.getOrElse(inDf))
    outDf
  }

Но мы не можем позволить себе оплачивать rdd.getNumPartitions за каждый DataFrame таким образом.

Разве нет никакого способаполучить эту информацию - например, из запроса онлайн / временного catalog для таблицы registered, может быть?

Обновление Графический интерфейс Spark показал, что операция DataFrame.rdd занимает столько времени, сколькосамый длинный sql в работе.Я перезапущу задание и немного прикреплю скриншот здесь.

Ниже приведен всего лишь контрольный пример : он использует небольшую часть размера данных в производственном.Самая длинная sql составляет всего пять минут - и вот она на пути к тому, чтобы потратить такое количество времени , а также (обратите внимание, что sql - это , а не :впоследствии он также должен выполняться, таким образом эффективно удваивая совокупное время выполнения.)

enter image description here

Мы можем видеть, что операция .rdd в строке DataFrameUtils30 (показано во фрагменте выше) занимает 5,1 минуты - и все же операция save все еще заняла 5,2 минуты спустя - то есть мы не сохранили любое время, выполнив .rddпо срокам исполнения последующих save.

Ответы [ 2 ]

0 голосов
/ 19 января 2019

По моему опыту df.rdd.getNumPartitions очень быстро, я никогда не сталкивался с тем, чтобы брать это больше секунды или около того.

Кроме того, вы также можете попробовать

val numPartitions: Long = df
      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()

, что позволит избежать использования .rdd

0 голосов
/ 19 января 2019

В rdd.getNumPartitions отсутствует внутренняя стоимость компонента rdd, поскольку возвращенное значение RDD никогда не оценивается.

Хотя вы можете легко определить это эмпирически, используя отладчик (я оставлю это какупражнение для читателя) или установление того, что в базовом сценарии не запускаются никакие задания

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

, этого может быть недостаточно, чтобы убедить вас.Итак, давайте подойдем к этому более систематическим образом:

  • rdd возвращает MapPartitionRDD (ds, как определено выше):

    scala> ds.rdd.getClass
    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
    
  • RDD.getNumPartitions вызывает RDD.partitions.

  • В сценарии без контрольной точки RDD.partitions вызывает getPartitions (не стесняйтесь отслеживатьпуть к контрольной точке).
  • RDD.getPartitions является абстрактным .
  • Таким образом, в данном случае используется фактическая реализация MapPartitionsRDD.getPartitions, которая просто делегируетвызов родителю .
  • Между rdd и источником есть только MapPartitionsRDD.

    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    

    Аналогично, если Dataset содержал обмен, мыбудет следовать за родителями до ближайшего шаффла:

    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    

    Обратите внимание, что этот случай особенно интересен, потому что мы фактически вызвали работу:

    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    

    Это потому, что мы столкнулись как сценарийгде разделы не могут быть определены статически (см. Количество разделов данных после сортировки? и Почему преобразование sortByВы можете запустить задание Spark? ).

    В этом сценарии getNumPartitions также вызовет задание:

    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    

    , однако это не означает, что наблюдаемая стоимость каким-то образомсвязанные с .rdd вызовом.Вместо этого это внутренняя стоимость поиска partitions в случае, когда нет статической формулы (например, в некоторых форматах ввода Hadoop, где требуется полное сканирование данных).

Обратите внимание, что приведенные здесь пункты не должны быть экстраполированы на другие приложения Dataset.rdd.Например, ds.rdd.count будет действительно дорогим и расточительным.

...