Существует ряд вопросов о том, как получить количество разделов для RDD
и / или DataFrame
: ответы всегда следующие:
rdd.getNumPartitions
или
df.rdd.getNumPartitions
К сожалению, это дорогая операция на DataFrame
, потому что
df.rdd
требует преобразования из DataFrame
в rdd
.Это порядка времени, необходимого для запуска
df.count
Я пишу логику, которая опционально repartition
или coalesce
'sa DataFrame
- на основебыло ли число разделов current в диапазоне допустимых значений или вместо этого или ниже их.
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
Но мы не можем позволить себе оплачивать rdd.getNumPartitions
за каждый DataFrame
таким образом.
Разве нет никакого способаполучить эту информацию - например, из запроса онлайн / временного catalog
для таблицы registered
, может быть?
Обновление Графический интерфейс Spark показал, что операция DataFrame.rdd занимает столько времени, сколькосамый длинный sql в работе.Я перезапущу задание и немного прикреплю скриншот здесь.
Ниже приведен всего лишь контрольный пример : он использует небольшую часть размера данных в производственном.Самая длинная sql
составляет всего пять минут - и вот она на пути к тому, чтобы потратить такое количество времени , а также (обратите внимание, что sql
- это , а не :впоследствии он также должен выполняться, таким образом эффективно удваивая совокупное время выполнения.)
Мы можем видеть, что операция .rdd
в строке DataFrameUtils
30 (показано во фрагменте выше) занимает 5,1 минуты - и все же операция save
все еще заняла 5,2 минуты спустя - то есть мы не сохранили любое время, выполнив .rdd
по срокам исполнения последующих save
.