широковещательное хеш-соединение не работает с полным объединением в Spark SQL? - PullRequest
0 голосов
/ 03 марта 2019

У меня есть две маленькие таблицы, которые должны выполнять полное внешнее объединение следующим образом, я думал, что оно должно использовать широковещательное соединение, но выбрано Sort Merge Join, я хотел бы знать, почему.

  test("SparkTest 0461") {

    val spark = SparkSession.builder().master("local").appName("SparkTest0460").getOrCreate()
    import spark.implicits._
    val data1 = Seq((1, 2), (1, 7), (3, 6), (5, 4), (1, 10), (6, 7), (2, 5))
    val data2 = Seq(9, 4, 2, 7, 6, 8)
    val x = 10L * 1024*1024
    spark.sql(s"set spark.sql.autoBroadcastJoinThreshold=$x")
    spark.createDataset(data1).toDF("a", "b").createOrReplaceTempView("x")
    spark.createDataset(data2).toDF("c").createOrReplaceTempView("y")
    val df = spark.sql(
      """
         select * from x full join y on a = c
      """.stripMargin(' '))
      df.explain(true)
  }

Физический план выглядит следующим образом, что показывает, что он использует SMJ

== Physical Plan ==
SortMergeJoinExec [a#11], [c#19], FullOuter
:- *(1) SortExec [a#11 ASC NULLS FIRST], false, 0
:  +- ShuffleExchangeExec hashpartitioning(a#11, 200)
:     +- LocalTableScanExec [a#11, b#12]
+- *(2) SortExec [c#19 ASC NULLS FIRST], false, 0
   +- ShuffleExchangeExec hashpartitioning(c#19, 200)
      +- LocalTableScanExec [c#19]

1 Ответ

0 голосов
/ 03 марта 2019

BroadcastHashJoin не поддерживается для full outer join.Проверьте эту ссылку для получения подробной информации.

Если вы замените full outer join на любое из поддерживаемых объединений, физический план покажет, что он выбрал BroadcastHashJoin.

Например,

val dfOuter = spark.sql(""" select * from x outer join y on a = c """.stripMargin(' '))
dfOuter.explain(true)

дает

== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, ('a = 'c)
   :- 'SubqueryAlias outer
   :  +- 'UnresolvedRelation `x`
   +- 'UnresolvedRelation `y`

== Analyzed Logical Plan ==
a: int, b: int, c: int
Project [a#75, b#76, c#82]
+- Join Inner, (a#75 = c#82)
   :- SubqueryAlias outer
   :  +- SubqueryAlias x
   :     +- Project [_1#72 AS a#75, _2#73 AS b#76]
   :        +- LocalRelation [_1#72, _2#73]
   +- SubqueryAlias y
      +- Project [value#80 AS c#82]
         +- LocalRelation [value#80]

== Optimized Logical Plan ==
Join Inner, (a#75 = c#82)
:- LocalRelation [a#75, b#76]
+- LocalRelation [c#82]

== Physical Plan ==
*(1) BroadcastHashJoin [a#75], [c#82], Inner, BuildRight
:- LocalTableScan [a#75, b#76]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [c#82]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...