Честно говоря, с этими томами это не имеет большого значения.
Глядя на .explain () на обоих подходах, в нем не так много.
A broadcast join
- это очевидно в обоих случаях. Кроме того, union
не вызывает shuffle
, по крайней мере ваш вопрос не подразумевает этого, то есть из-за преобразований, которые могут вызвать это.
То есть производительность равна / должна быть одинаковой. См. Ниже, смоделированный подход DF, но демонстрация обсуждаемых пунктов. Математически не так много, чтобы решить иначе.
Подход 1
import org.apache.spark.sql.functions.{sha1, rand, col}
val randomDF1 = (spark.range(1, 42000000)
.withColumn("random_value", rand(seed=10).cast("string"))
.withColumn("hash", sha1($"random_value"))
.drop("random_value")
).toDF("id", "hash")
val randomDF2 = (spark.range(1, 42000000)
.withColumn("random_value", rand(seed=10).cast("string"))
.withColumn("hash", sha1($"random_value"))
.drop("random_value")
).toDF("id", "hash")
val randomDF3 = (spark.range(1, 100000)
.withColumn("random_value", rand(seed=10).cast("string"))
.withColumn("hash", sha1($"random_value"))
.drop("random_value")
).toDF("id", "hash")
val u = randomDF1.union(randomDF2)
val u2 = u.join(randomDF3, "id").explain()
== Physical Plan ==
*(4) Project [id#25284L, hash#25296, hash#25326]
+- *(4) BroadcastHashJoin [id#25284L], [id#25314L], Inner, BuildRight
:- Union
: :- *(1) Project [id#25284L, sha1(cast(random_value#25286 as binary)) AS hash#25296]
: : +- *(1) Project [id#25284L, cast(rand(10) as string) AS random_value#25286]
: : +- *(1) Range (1, 42000000, step=1, splits=2)
: +- *(2) Project [id#25299L, sha1(cast(random_value#25301 as binary)) AS hash#25311]
: +- *(2) Project [id#25299L, cast(rand(10) as string) AS random_value#25301]
: +- *(2) Range (1, 42000000, step=1, splits=2)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13264]
+- *(3) Project [id#25314L, sha1(cast(random_value#25316 as binary)) AS hash#25326]
+- *(3) Project [id#25314L, cast(rand(10) as string) AS random_value#25316]
+- *(3) Range (1, 100000, step=1, splits=2)
Подход 2
import org.apache.spark.sql.functions.{sha1, rand, col}
val randomDF1 = (spark.range(1, 42000000)
.withColumn("random_value", rand(seed=10).cast("string"))
.withColumn("hash", sha1($"random_value"))
.drop("random_value")
).toDF("id", "hash")
val randomDF2 = (spark.range(1, 42000000)
.withColumn("random_value", rand(seed=10).cast("string"))
.withColumn("hash", sha1($"random_value"))
.drop("random_value")
).toDF("id", "hash")
val randomDF3 = (spark.range(1, 100000)
.withColumn("random_value", rand(seed=10).cast("string"))
.withColumn("hash", sha1($"random_value"))
.drop("random_value")
).toDF("id", "hash")
val u1 = randomDF1.join(randomDF3, "id")
val u2 = randomDF2.join(randomDF3, "id")
val u3 = u1.union(u2).explain()
== Physical Plan ==
Union
:- *(2) Project [id#25335L, hash#25347, hash#25377]
: +- *(2) BroadcastHashJoin [id#25335L], [id#25365L], Inner, BuildRight
: :- *(2) Project [id#25335L, sha1(cast(random_value#25337 as binary)) AS hash#25347]
: : +- *(2) Project [id#25335L, cast(rand(10) as string) AS random_value#25337]
: : +- *(2) Range (1, 42000000, step=1, splits=2)
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13409]
: +- *(1) Project [id#25365L, sha1(cast(random_value#25367 as binary)) AS hash#25377]
: +- *(1) Project [id#25365L, cast(rand(10) as string) AS random_value#25367]
: +- *(1) Range (1, 100000, step=1, splits=2)
+- *(4) Project [id#25350L, hash#25362, hash#25377]
+- *(4) BroadcastHashJoin [id#25350L], [id#25365L], Inner, BuildRight
:- *(4) Project [id#25350L, sha1(cast(random_value#25352 as binary)) AS hash#25362]
: +- *(4) Project [id#25350L, cast(rand(10) as string) AS random_value#25352]
: +- *(4) Range (1, 42000000, step=1, splits=2)
+- ReusedExchange [id#25365L, hash#25377], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#13409]