Я посмотрел на этот вопрос: Вопрос о присоединении фреймов данных в Spark и ответил на него, основываясь на моих наблюдениях.
Действительно интересный вопрос, так как я отметил, что Физический план выполнения левого (внешнего соединения) отличается при использовании Spark со Scala или при использовании pyspark.
Теперь я отмечаю, что есть различия в pyspark с травлением, но я бы подумал, что - как мне недавно сказали на интервью, может быть, вопрос с подвохом - что Catalyst попытается реализовать те же физические планы под капотом. по большому счету.
Почему это может отличаться?
- Травление
- Различные возможности разбиения под pyspark? Не думай так.
- Настройки трансляции?
- Другие настройки по умолчанию или понимание, неизвестное мне на основе оболочки pyspark?
Например, количество разделов, используемых для JOIN, и результирующее количество разделов , как показано в ссылке , отличаются с точки зрения обработки (результат).
Любопытно, но я бы подумал, что синтаксис может отличаться, но нижний слой будет общим для pyspark и Spark. Я предполагаю, что засоление как-то связано с этим или разделением. Кстати, запросы не имеют никакого реального смысла, просто надуманные.
Вот физические планы по существу того же самого материала:
Pyspark
df4 = df1.join(df2, on=['key1', 'key2', 'time'], how='left').explain()
== Физический план ==
*(5) Project [key1#474L, key2#475L, time#476L]
+- SortMergeJoin [key1#474L, key2#475L, time#476L], [key1#480L, key2#481L, time#482L], LeftOuter
:- *(2) Sort [key1#474L ASC NULLS FIRST, key2#475L ASC NULLS FIRST, time#476L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key1#474L, key2#475L, time#476L, 3)
: +- *(1) Scan ExistingRDD[key1#474L,key2#475L,time#476L]
+- *(4) Sort [key1#480L ASC NULLS FIRST, key2#481L ASC NULLS FIRST, time#482L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key1#480L, key2#481L, time#482L, 3)
+- *(3) Filter ((isnotnull(key1#480L) && isnotnull(key2#481L)) && isnotnull(time#482L))
+- *(3) Scan ExistingRDD[key1#480L,key2#481L,time#482L]
Scala
val df4 = df1.join(df2, Seq("key1", "key2", "time"), "left").explain()
== Физический план ==
*(1) Project [key1#647, key2#648, time#649]
+- *(1) BroadcastHashJoin [key1#647, key2#648, time#649], [key1#671, key2#672, time#673], LeftOuter, BuildRight, false
:- Exchange RoundRobinPartitioning(3)
: +- LocalTableScan [key1#647, key2#648, time#649]
+- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, int, false], input[1, int, false], input[2, int, false]))
+- Exchange RoundRobinPartitioning(3)
+- LocalTableScan [key1#671, key2#672, time#673]