Это мой пример данных
val sc = spark.sparkContext
val myDataFrame = spark.range(20).toDF("mycol").repartition($"mycol")
myDataFrame.show(false)
Вывод :
+-----+
|mycol|
+-----+
|19 |
|0 |
|7 |
|6 |
|9 |
|17 |
|5 |
|1 |
|10 |
|3 |
|12 |
|8 |
|11 |
|2 |
|4 |
|13 |
|18 |
|14 |
|15 |
|16 |
+-----+
В приведенном выше коде при повторном разбиении столбца тогда будет создано 200 разделов, поскольку spark.sql.shuffle.partitions = 200
в том, что многие из них не используются или пустые разделы, так как данные представляют собой всего лишь 10 чисел (мы пытаемся уместить 20 номеров в 200 разделов, значит .... большинство разделов пустые .. ..: -))
1) Подготовьте длинную аккумуляторную переменную для быстрого подсчета непустых разделов.
2) Добавьте все непустые разделы в аккумуляторную переменную, например ниже пример.
val nonEmptyPartitions = sc.longAccumulator("nonEmptyPartitions")
myDataFrame.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPartitions.add(1))
- отбрасывать непустые разделы (означает объединять их ... меньше перемешивания / минимальное перемешивание).
- распечатать их.
val finalDf = myDataFrame.coalesce(nonEmptyPartitions.value.toInt)
println(s"nonEmptyPart : ${nonEmptyPartitions.value.toInt}")
println(s"df.rdd.partitions.length : ${myDataFrame.rdd.getNumPartitions}")
println(s"finalDf.rdd.partitions.length : ${finalDf.rdd.getNumPartitions}")
распечатать их ...
Результат :
nonEmptyPart : 20
df.rdd.partitions.length : 200
finalDf.rdd.partitions.length : 20
Доказательство того, что все непустые разделы удалены . ..
myDataFrame.withColumn("partitionId", org.apache.spark.sql.functions.spark_partition_id)
.groupBy("partitionId")
.count
.show
Результат распечатанного количества записей по разделам:
+-----------+-----+
|partitionId|count|
+-----------+-----+
|128 |1 |
|190 |1 |
|140 |1 |
|164 |1 |
|5 |1 |
|154 |1 |
|112 |1 |
|107 |1 |
|4 |1 |
|49 |1 |
|69 |1 |
|77 |1 |
|45 |1 |
|121 |1 |
|143 |1 |
|58 |1 |
|11 |1 |
|150 |1 |
|68 |1 |
|116 |1 |
+-----------+-----+
Примечание:
* 104 9 *
Использование spark_partition_id
предназначено только для демонстрации / отладки, а не для производственных целей.
Я уменьшил 200 разделов (из-за переделки по столбцу) до 20 непустых разделов.
Вывод:
Наконец, вы избавились от лишних пустых разделов, на которых нет данных, и избежали ненужного расписания для фиктивных задач на пустых разделах.