У меня есть две маленькие таблицы, которые должны выполнять полное внешнее объединение следующим образом, я думал, что оно должно использовать широковещательное соединение, но выбрано Sort Merge Join, я хотел бы знать, почему.
test("SparkTest 0461") {
val spark = SparkSession.builder().master("local").appName("SparkTest0460").getOrCreate()
import spark.implicits._
val data1 = Seq((1, 2), (1, 7), (3, 6), (5, 4), (1, 10), (6, 7), (2, 5))
val data2 = Seq(9, 4, 2, 7, 6, 8)
val x = 10L * 1024*1024
spark.sql(s"set spark.sql.autoBroadcastJoinThreshold=$x")
spark.createDataset(data1).toDF("a", "b").createOrReplaceTempView("x")
spark.createDataset(data2).toDF("c").createOrReplaceTempView("y")
val df = spark.sql(
"""
select * from x full join y on a = c
""".stripMargin(' '))
df.explain(true)
}
Физический план выглядит следующим образом, что показывает, что он использует SMJ
== Physical Plan ==
SortMergeJoinExec [a#11], [c#19], FullOuter
:- *(1) SortExec [a#11 ASC NULLS FIRST], false, 0
: +- ShuffleExchangeExec hashpartitioning(a#11, 200)
: +- LocalTableScanExec [a#11, b#12]
+- *(2) SortExec [c#19 ASC NULLS FIRST], false, 0
+- ShuffleExchangeExec hashpartitioning(c#19, 200)
+- LocalTableScanExec [c#19]