Разве () сортирует набор данных? - PullRequest
3 голосов
/ 04 июня 2019

Я пишу приложение для предварительной обработки, которое, помимо других преобразований и действий, сортирует набор данных в конце, прежде чем записать его в HDFS. Новый запрос требует от меня дублирования набора данных, поэтому я хотел бы сделать это в один этап с сортировкой. Насколько я понимаю, для эффективного устранения дубликатов необходима сортировка (возможно, я ошибаюсь в этом, мало исследовал ее, просто кажется естественным).

По некоторым причинам (столбец MapType в выходной схеме) я сначала проверил distinct на более ранней стадии, чем sort, думая, что позже я избавлюсь от MapType столбцов, чтобы объединить их вместе. .

Spark UI output

Что произошло, так это то, что второй этап сортировки был пропущен, как если бы набор данных уже был отсортирован. Это имеет смысл для меня, но не поддерживается нигде в документации (AFAIK), и я не знаю, является ли ожидаемое поведение стабильным (я не хочу доводить это до производства, просто чтобы понять, что я внезапно делать 2 дорогих этапа: sort и distinct оба). У кого-нибудь есть еще идеи о том, как sort и / или distinct реализованы?

1 Ответ

7 голосов
/ 04 июня 2019

В Spark, distinct и вообще все операции агрегирования (например, groupBy) не не сортируют данные. Это легко проверить, используя функцию explain.

// Let's generate a df with 5 elements in [0, 4[ to have at least one duplicate
val data = spark.range(5).select(floor(rand() * 4) as "r")

data.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#105L], functions=[])
+- Exchange hashpartitioning(r#105L, 200)
   +- *HashAggregate(keys=[r#105L], functions=[])
      +- *Project [FLOOR((rand(7842501052366484791) * 5.0)) AS r#105L]
         +- *Range (0, 10, step=1, splits=2)

HashAggregate + Exchange означает, что элементы хэшируются и перетасовываются, так что элементы с одинаковым хешем находятся в одном разделе. Затем элементы с одинаковым хешем сравниваются и дедублируются. Поэтому данные не сортируются после процесса. Давайте проверим, что:

data.distinct.show()
+---+                                                                           
|  r|
+---+
|  0|
|  3|
|  2|
+---+

Давайте рассмотрим вашу озабоченность по поводу производительности сейчас. Если вы сортируете после дедупликации, вот что происходит.

data.distinct.orderBy("r").explain
== Physical Plan ==
*Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
   +- *HashAggregate(keys=[r#227L], functions=[])
      +- Exchange hashpartitioning(r#227L, 200)
         +- *HashAggregate(keys=[r#227L], functions=[])
            +- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
               +- *Range (0, 5, step=1, splits=2)

Мы можем видеть, что данные перемешиваются для дедупликации (Exchange hashpartitioning) и снова перемешиваются для сортировки (Exchange rangepartitioning). Это довольно дорого. Это связано с тем, что для сортировки требуется случайное перемещение по диапазону, чтобы элементы в одном и том же диапазоне попадали в один и тот же раздел, который затем можно сортировать. Тем не менее, мы можем быть умнее и сортировать перед дедупликацией:

data.orderBy("r").distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#227L], functions=[])
+- *HashAggregate(keys=[r#227L], functions=[])
   +- *Sort [r#227L ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
         +- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
            +- *Range (0, 5, step=1, splits=2)

Остался только один обмен. Действительно, spark знает, что после случайного перемещения по диапазону дублированные элементы находятся в одном разделе. Поэтому он не вызывает новую случайную смену.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...