Что является оптимальным в искре: объединение, затем присоединение или присоединение к объединению? - PullRequest
0 голосов
/ 16 марта 2020

Имеются три разных кадра данных: df1 и df2, которые имеют одинаковую схему, и df3. Три кадра данных имеют одно общее поле.

Также учтите, что df1 и df2 имеют около 42 миллионов записей каждая, а df3 имеет около 100 000 записей.

Что является оптимальным в искре:

  • Объединение df1 и df2, затем объединение с df3?
  • Присоединение df1 с df3, объединение df2 с df3, затем объединение этих двух информационных фреймов?

1 Ответ

1 голос
/ 16 марта 2020

Честно говоря, с этими томами это не имеет большого значения.

Глядя на .explain () на обоих подходах, в нем не так много.

A broadcast join - это очевидно в обоих случаях. Кроме того, union не вызывает shuffle, по крайней мере ваш вопрос не подразумевает этого, то есть из-за преобразований, которые могут вызвать это.

То есть производительность равна / должна быть одинаковой. См. Ниже, смоделированный подход DF, но демонстрация обсуждаемых пунктов. Математически не так много, чтобы решить иначе.

Подход 1

import org.apache.spark.sql.functions.{sha1, rand, col}

val randomDF1 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF2 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF3 = (spark.range(1, 100000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val u = randomDF1.union(randomDF2) 
 val u2 = u.join(randomDF3, "id").explain()

== Physical Plan ==
*(4) Project [id#25284L, hash#25296, hash#25326]
+- *(4) BroadcastHashJoin [id#25284L], [id#25314L], Inner, BuildRight
   :- Union
   :  :- *(1) Project [id#25284L, sha1(cast(random_value#25286 as binary)) AS hash#25296]
   :  :  +- *(1) Project [id#25284L, cast(rand(10) as string) AS random_value#25286]
   :  :     +- *(1) Range (1, 42000000, step=1, splits=2)
   :  +- *(2) Project [id#25299L, sha1(cast(random_value#25301 as binary)) AS hash#25311]
   :     +- *(2) Project [id#25299L, cast(rand(10) as string) AS random_value#25301]
   :        +- *(2) Range (1, 42000000, step=1, splits=2)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13264]
   +- *(3) Project [id#25314L, sha1(cast(random_value#25316 as binary)) AS hash#25326]
      +- *(3) Project [id#25314L, cast(rand(10) as string) AS random_value#25316]
         +- *(3) Range (1, 100000, step=1, splits=2)

Подход 2

import org.apache.spark.sql.functions.{sha1, rand, col}

val randomDF1 = (spark.range(1, 42000000)
  .withColumn("random_value", rand(seed=10).cast("string"))
  .withColumn("hash", sha1($"random_value"))
  .drop("random_value")
).toDF("id", "hash")

val randomDF2 = (spark.range(1, 42000000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val randomDF3 = (spark.range(1, 100000)
   .withColumn("random_value", rand(seed=10).cast("string"))
   .withColumn("hash", sha1($"random_value"))
   .drop("random_value")
).toDF("id", "hash")

val u1 = randomDF1.join(randomDF3, "id") 
val u2 = randomDF2.join(randomDF3, "id") 
val u3 = u1.union(u2).explain() 

== Physical Plan ==
Union
:- *(2) Project [id#25335L, hash#25347, hash#25377]
:  +- *(2) BroadcastHashJoin [id#25335L], [id#25365L], Inner, BuildRight
:     :- *(2) Project [id#25335L, sha1(cast(random_value#25337 as binary)) AS hash#25347]
:     :  +- *(2) Project [id#25335L, cast(rand(10) as string) AS random_value#25337]
:     :     +- *(2) Range (1, 42000000, step=1, splits=2)
:     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13409]
:        +- *(1) Project [id#25365L, sha1(cast(random_value#25367 as binary)) AS hash#25377]
:           +- *(1) Project [id#25365L, cast(rand(10) as string) AS random_value#25367]
:              +- *(1) Range (1, 100000, step=1, splits=2)
+- *(4) Project [id#25350L, hash#25362, hash#25377]
   +- *(4) BroadcastHashJoin [id#25350L], [id#25365L], Inner, BuildRight
      :- *(4) Project [id#25350L, sha1(cast(random_value#25352 as binary)) AS hash#25362]
      :  +- *(4) Project [id#25350L, cast(rand(10) as string) AS random_value#25352]
      :     +- *(4) Range (1, 42000000, step=1, splits=2)
  +- ReusedExchange [id#25365L, hash#25377], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13409]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...