Реализация Spark Physical Plan отличается для pyspark и Scala для одной и той же логики - PullRequest
0 голосов
/ 19 марта 2019

Я посмотрел на этот вопрос: Вопрос о присоединении фреймов данных в Spark и ответил на него, основываясь на моих наблюдениях.

Действительно интересный вопрос, так как я отметил, что Физический план выполнения левого (внешнего соединения) отличается при использовании Spark со Scala или при использовании pyspark.

Теперь я отмечаю, что есть различия в pyspark с травлением, но я бы подумал, что - как мне недавно сказали на интервью, может быть, вопрос с подвохом - что Catalyst попытается реализовать те же физические планы под капотом. по большому счету.

Почему это может отличаться?

  • Травление
  • Различные возможности разбиения под pyspark? Не думай так.
  • Настройки трансляции?
  • Другие настройки по умолчанию или понимание, неизвестное мне на основе оболочки pyspark?

Например, количество разделов, используемых для JOIN, и результирующее количество разделов , как показано в ссылке , отличаются с точки зрения обработки (результат).

Любопытно, но я бы подумал, что синтаксис может отличаться, но нижний слой будет общим для pyspark и Spark. Я предполагаю, что засоление как-то связано с этим или разделением. Кстати, запросы не имеют никакого реального смысла, просто надуманные.

Вот физические планы по существу того же самого материала:

Pyspark

df4 = df1.join(df2, on=['key1', 'key2', 'time'], how='left').explain()

== Физический план ==

*(5) Project [key1#474L, key2#475L, time#476L]
+- SortMergeJoin [key1#474L, key2#475L, time#476L], [key1#480L, key2#481L, time#482L], LeftOuter
   :- *(2) Sort [key1#474L ASC NULLS FIRST, key2#475L ASC NULLS FIRST, time#476L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key1#474L, key2#475L, time#476L, 3)
   :     +- *(1) Scan ExistingRDD[key1#474L,key2#475L,time#476L]
    +- *(4) Sort [key1#480L ASC NULLS FIRST, key2#481L ASC NULLS FIRST, time#482L ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(key1#480L, key2#481L, time#482L, 3)
          +- *(3) Filter ((isnotnull(key1#480L) && isnotnull(key2#481L)) && isnotnull(time#482L))
            +- *(3) Scan ExistingRDD[key1#480L,key2#481L,time#482L]

Scala

val df4 = df1.join(df2, Seq("key1", "key2", "time"), "left").explain()

== Физический план ==

 *(1) Project [key1#647, key2#648, time#649]
 +- *(1) BroadcastHashJoin [key1#647, key2#648, time#649], [key1#671, key2#672, time#673], LeftOuter, BuildRight, false
   :- Exchange RoundRobinPartitioning(3)
   :  +- LocalTableScan [key1#647, key2#648, time#649]
   +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, int, false], input[1, int, false], input[2, int, false]))
      +- Exchange RoundRobinPartitioning(3)
         +- LocalTableScan [key1#671, key2#672, time#673]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...