На вопрос нельзя ответить да или нет , так как ответ зависит от деталей DataFrames.
Производительность объединения зависит от некоторых хорошихчасть на вопрос, сколько тасования необходимо для его выполнения. Если обе стороны объединения разделены одним и тем же столбцом (столбцами), соединение будет быстрее. Эффект разделения можно увидеть, посмотрев план выполнения объединения.
Мы создаем два кадра данных df1
и df2
со столбцами a
, b
, c
и d
:
val sparkSession = ...
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import sparkSession.implicits._
val cols = Seq("a","b","c")
def createDf = (1 to 3).map(i => (i,i,i)).toDF(cols:_*).withColumn("d", concat_ws("_", cols.map(col):_*))
val df1 = createDf
val df2 = createDf
df1
и df2
выглядят одинаково:
+---+---+---+-----+
| a| b| c| d|
+---+---+---+-----+
| 1| 1| 1|1_1_1|
| 2| 2| 2|2_2_2|
| 3| 3| 3|3_3_3|
+---+---+---+-----+
Когда мы разбиваем оба DataFrames на столбец d
и используем этот столбец в качестве условия соединения
df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), "d").explain()
мы получаем план выполнения
== Physical Plan ==
*(3) Project [d#13, a#7, b#8, c#9, a#25, b#26, c#27]
+- *(3) SortMergeJoin [d#13], [d#31], Inner
:- *(1) Sort [d#13 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(d#13, 4)
: +- LocalTableScan [a#7, b#8, c#9, d#13]
+- *(2) Sort [d#31 ASC NULLS FIRST], false, 0
+- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(d#13, 4)
Разделение обоих фреймов данных на d
, но объединение a
, b
и c
df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), cols).explain()
приводит к плану выполнения
== Physical Plan ==
*(3) Project [a#7, b#8, c#9, d#13, d#31]
+- *(3) SortMergeJoin [a#7, b#8, c#9], [a#25, b#26, c#27], Inner
:- *(1) Sort [a#7 ASC NULLS FIRST, b#8 ASC NULLS FIRST, c#9 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(a#7, b#8, c#9, 200)
: +- Exchange hashpartitioning(d#13, 4)
: +- LocalTableScan [a#7, b#8, c#9, d#13]
+- *(2) Sort [a#25 ASC NULLS FIRST, b#26 ASC NULLS FIRST, c#27 ASC NULLS FIRST], false, 0
+- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(a#7, b#8, c#9, 200)
, который содержит на Exchange hashpartitioning
больше, чем первый план. В этом случае объединение a
, b
, c
будет медленнее.
С другой стороны, если кадры данных разделены на a
, b
и c
объединение a
, b
, c
будет быстрее объединения d
.