Скажи, что я хочу объединить 3 таблицы A, B, C с внутренним объединением и C очень маленьким.
#DUMMY EXAMPLE with IN-MEMORY table, but same issue if load table using spark.read.parquet("")
var A = (1 to 1000000).toSeq.toDF("A")
var B = (1 to 1000000).toSeq.toDF("B")
var C = (1 to 10).toSeq.toDF("C")
И я не могу контролировать, какой порядок объединения мне приведен:
CASE1 = A.join(B,expr("A=B"),"inner").join(C,expr("A=C"),"inner")
CASE2 = A.join(C,expr("A=C"),"inner").join(B,expr("A=B"),"inner")
Запуск обоих шоу CASE1 работает на 30-40% медленнее, чем CASE2.
Поэтому возникает вопрос: как использовать CBO Spark для автоматического перевода CASE1 как CASE2 для загруженной таблицы или таблицы в памятиот паркетного ридера Spark?
Я пытался сделать:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.cbo.enabled", "true")
A.createOrReplaceTempView("A")
spark.sql("ANALYZE TABLE A COMPUTE STATISTICS")
, но это выдает:
org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'a' not found in database 'default'
Любой другой способ активировать CBO без необходимости сохранять таблицув улье?
Приложение:
- Даже с spark.conf.set ("spark.sql.cbo.enabled", "true") в SparkWebUI
- не отображается оценка стоимости, показывающая CASE1.explain! = CASE2.explain
CASE1.explain
== Physical Plan ==
*(5) SortMergeJoin [A#3], [C#13], Inner
:- *(3) SortMergeJoin [A#3], [B#8], Inner
: :- *(1) Sort [A#3 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(A#3, 200)
: : +- LocalTableScan [A#3]
: +- *(2) Sort [B#8 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(B#8, 200)
: +- LocalTableScan [B#8]
+- *(4) Sort [C#13 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(C#13, 200)
+- LocalTableScan [C#13]
CASE2.explain
== Physical Plan ==
*(5) SortMergeJoin [A#3], [B#8], Inner
:- *(3) SortMergeJoin [A#3], [C#13], Inner
: :- *(1) Sort [A#3 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(A#3, 200)
: : +- LocalTableScan [A#3]
: +- *(2) Sort [C#13 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(C#13, 200)
: +- LocalTableScan [C#13]
+- *(4) Sort [B#8 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(B#8, 200)
+- LocalTableScan [B#8]