Количество разделов данных после сортировки? - PullRequest
0 голосов
/ 14 декабря 2018

Как spark определяет количество разделов после использования orderBy?Я всегда думал, что результирующий фрейм данных имеет spark.sql.shuffle.partitions, но, похоже, это не так:

val df = (1 to 10000).map(i => ("a",i)).toDF("n","i").repartition(10).cache

df.orderBy($"i").rdd.getNumPartitions // = 200 (=spark.sql.shuffle.partitions)
df.orderBy($"n").rdd.getNumPartitions // = 2 

В обоих случаях, спарк делает +- Exchange rangepartitioning(i/n ASC NULLS FIRST, 200), так как же полученное число разделов ввторой случай будет 2?

Ответы [ 2 ]

0 голосов
/ 01 января 2019

Я провел различные тесты, чтобы взглянуть на это более эмпирически, в дополнение к рассмотрению разбивки по диапазонам для сортировки - в этом суть вопроса.См. Как работает разделитель диапазонов в Spark? .

Экспериментируя как с 1 отличным значением для «n», как в примере в вопросе, так и с более чем 1 таким отдельным значением для«n», то с использованием различных размеров данных с df.orderBy ($ «n») :

  • ясно, что при расчете определяется количество разделовкоторые будут содержать диапазоны данных для сортировки впоследствии через mapPartitions,
  • , которая основана на выборке из существующих разделов до вычисления некоторого эвристически оптимального количества разделов для этих вычисленных диапазонов,
  • в большинстве случаев вычислит и, таким образом, создаст N + 1 разделов , при этом раздел N + 1 будет пустым .

Тот факт, что выделенный дополнительный разделпочти всегда пустое заставляет меня думать, что в кодировании в некотором роде есть ошибка вычисления, другими словами, небольшая ошибка imho.

Я основываю это на следующем простом тесте, wон возвращает то, что RR, я подозреваю, будет правильным числом разделов:

val df_a1 = (1 to 1).map(i => ("a",i)).toDF("n","i").cache
val df_a2 = (1 to 1).map(i => ("b",i)).toDF("n","i").cache
val df_a3 = (1 to 1).map(i => ("c",i)).toDF("n","i").cache
val df_b = df_a1.union(df_a2)
val df_c = df_b.union(df_a3)

df_c.orderBy($"n")
 .rdd
 .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
 .toDF("partition_number","number_of_records")
 .show(100,false)

возвращает:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|0               |1                |
|1               |1                |
|2               |1                |
+----------------+-----------------+

Этот пример расчета границы довольно прост.Как только я использую от 1 до 2 или 1 .. N для любого из "n", получаются дополнительные пустые разделы:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|0               |2                |
|1               |1                |
|2               |1                |
|3               |0                |
+----------------+-----------------+

Сортировкатребует, чтобы все данные для данного "n" или набора "n" находились в одном разделе.

0 голосов
/ 30 декабря 2018

spark.sql.shuffle.partitions используется в качестве верхней границы.Конечное число разделов 1 <= partitions <= spark.sql.shuffle.partition.


Как вы уже упоминали, сортировка в Spark выполняется через RangePartitioner.Он пытается разделить ваш набор данных на определенное число (spark.sql.shuffle.partition) примерно равных диапазонов.

Существует гарантия того, что равные значения будут в одном и том же разделе после разбиения.Стоит проверить документацию класса RangePartitioning (не является частью общедоступного API):

...

Все строки, где находятся выражения в orderingоценивать одинаковые значения будут в одном и том же разделе

И если число различных значений порядка меньше требуемого количества разделов, т.е. число возможных диапазонов меньше spark.sql.shuffle.partition,в итоге вы получите меньшее количество разделов.Кроме того, вот цитата из RangePartitioner Scaladoc:

Фактическое число разделов, созданных RangePartitioner, может не совпадать с параметром разделов, в случае, когдачисло выборочных записей меньше значения разделов.

Возвращаясь к вашему примеру, n является константой ("a") и не может быть разбит на части.С другой стороны, i может иметь 10 000 возможных значений и разбивается на 200 (=spark.sql.shuffle.partition) диапазонов или разделов.

Обратите внимание, что это верно только для API DataFrame / Dataset.При использовании RDD sortByKey можно либо указать число разделов явно, либо Spark будет использовать текущее количество разделов.

См. Также:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...