В Spark, distinct
и вообще все операции агрегирования (например, groupBy
) не не сортируют данные. Это легко проверить, используя функцию explain
.
// Let's generate a df with 5 elements in [0, 4[ to have at least one duplicate
val data = spark.range(5).select(floor(rand() * 4) as "r")
data.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#105L], functions=[])
+- Exchange hashpartitioning(r#105L, 200)
+- *HashAggregate(keys=[r#105L], functions=[])
+- *Project [FLOOR((rand(7842501052366484791) * 5.0)) AS r#105L]
+- *Range (0, 10, step=1, splits=2)
HashAggregate
+ Exchange
означает, что элементы хэшируются и перетасовываются, так что элементы с одинаковым хешем находятся в одном разделе. Затем элементы с одинаковым хешем сравниваются и дедублируются. Поэтому данные не сортируются после процесса. Давайте проверим, что:
data.distinct.show()
+---+
| r|
+---+
| 0|
| 3|
| 2|
+---+
Давайте рассмотрим вашу озабоченность по поводу производительности сейчас. Если вы сортируете после дедупликации, вот что происходит.
data.distinct.orderBy("r").explain
== Physical Plan ==
*Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
+- *HashAggregate(keys=[r#227L], functions=[])
+- Exchange hashpartitioning(r#227L, 200)
+- *HashAggregate(keys=[r#227L], functions=[])
+- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
+- *Range (0, 5, step=1, splits=2)
Мы можем видеть, что данные перемешиваются для дедупликации (Exchange hashpartitioning
) и снова перемешиваются для сортировки (Exchange rangepartitioning
). Это довольно дорого. Это связано с тем, что для сортировки требуется случайное перемещение по диапазону, чтобы элементы в одном и том же диапазоне попадали в один и тот же раздел, который затем можно сортировать. Тем не менее, мы можем быть умнее и сортировать перед дедупликацией:
data.orderBy("r").distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#227L], functions=[])
+- *HashAggregate(keys=[r#227L], functions=[])
+- *Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
+- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
+- *Range (0, 5, step=1, splits=2)
Остался только один обмен. Действительно, spark знает, что после случайного перемещения по диапазону дублированные элементы находятся в одном разделе. Поэтому он не вызывает новую случайную смену.