производительность spark join: несколько столбцов против одного столбца - PullRequest
1 голос
/ 08 октября 2019

Если у меня есть столбцы [a,b,c] в df1 и [a,b,c] в df2, а также столбец d, в обоих случаях d=concat_ws('_', *[a,b,c]) будет разница в производительности между:

  1. df1.join(df2, [a,b,c])
  2. df1.join(df2, d)

?

Ответы [ 2 ]

1 голос
/ 09 октября 2019

Я подозреваю, что объединение без объединения будет быстрее, потому что, вероятно, дешевле просто хешировать отдельные строки вместо конкатенации, а затем хешировать. Первый включает в себя меньшее количество java-объектов, которые должны быть GC'd, но это не полный ответ.

Имейте в виду, что это может не быть шагом, ограничивающим производительность вашего запроса, и поэтому в любом случаетак же быстро. Когда дело доходит до настройки производительности, лучше всего тестировать, а не гадать без данных.

Кроме того, как уже упоминалось выше, оставление столбцов без сцепления дает оптимизатору возможность исключить обмен при объединении, если входные данные уже разделеныправильно.

df1.join(df2, [a,b,c])
df1.join(df2, d)
1 голос
/ 09 октября 2019

На вопрос нельзя ответить да или нет , так как ответ зависит от деталей DataFrames.

Производительность объединения зависит от некоторых хорошихчасть на вопрос, сколько тасования необходимо для его выполнения. Если обе стороны объединения разделены одним и тем же столбцом (столбцами), соединение будет быстрее. Эффект разделения можно увидеть, посмотрев план выполнения объединения.

Мы создаем два кадра данных df1 и df2 со столбцами a, b, c и d:

val sparkSession = ...
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import sparkSession.implicits._
val cols = Seq("a","b","c")
def createDf = (1 to 3).map(i => (i,i,i)).toDF(cols:_*).withColumn("d", concat_ws("_", cols.map(col):_*))
val df1 = createDf
val df2 = createDf

df1 и df2 выглядят одинаково:

+---+---+---+-----+
|  a|  b|  c|    d|
+---+---+---+-----+
|  1|  1|  1|1_1_1|
|  2|  2|  2|2_2_2|
|  3|  3|  3|3_3_3|
+---+---+---+-----+

Когда мы разбиваем оба DataFrames на столбец d и используем этот столбец в качестве условия соединения

df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), "d").explain()

мы получаем план выполнения

== Physical Plan ==
*(3) Project [d#13, a#7, b#8, c#9, a#25, b#26, c#27]
+- *(3) SortMergeJoin [d#13], [d#31], Inner
   :- *(1) Sort [d#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(d#13, 4)
   :     +- LocalTableScan [a#7, b#8, c#9, d#13]
   +- *(2) Sort [d#31 ASC NULLS FIRST], false, 0
      +- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(d#13, 4)

Разделение обоих фреймов данных на d, но объединение a, b и c

df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), cols).explain()

приводит к плану выполнения

== Physical Plan ==
*(3) Project [a#7, b#8, c#9, d#13, d#31]
+- *(3) SortMergeJoin [a#7, b#8, c#9], [a#25, b#26, c#27], Inner
   :- *(1) Sort [a#7 ASC NULLS FIRST, b#8 ASC NULLS FIRST, c#9 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#7, b#8, c#9, 200)
   :     +- Exchange hashpartitioning(d#13, 4)
   :        +- LocalTableScan [a#7, b#8, c#9, d#13]
   +- *(2) Sort [a#25 ASC NULLS FIRST, b#26 ASC NULLS FIRST, c#27 ASC NULLS FIRST], false, 0
      +- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(a#7, b#8, c#9, 200)

, который содержит на Exchange hashpartitioning больше, чем первый план. В этом случае объединение a, b, c будет медленнее.

С другой стороны, если кадры данных разделены на a, b и cобъединение a, b, c будет быстрее объединения d.

...