Проблема в том, что после сохранения ваших данных second_id
включается в кэшированную таблицу и больше не считается постоянной.В результате планировщик больше не может сделать вывод, что запрос должен быть выражен декартовым произведением, и использует стандарт SortMergeJoin
для хеш-секционирования second_id
.
Было бы тривиально достичь того же результата без настойчивости,используя udf
from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
@pandas_udf('integer', PandasUDFType.SCALAR)
def identity(x):
return x
second_df = second_df.withColumn('second_id', identity(lit(1)))
result_df = first_df.join(second_df,
first_df.first_id == second_df.second_id,
'inner')
result_df.explain()
== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
: +- *(1) Filter isnotnull(first_id#4)
: +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(second_id#129, 200)
+- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
+- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
+- *(3) Project [some_value#6]
+- *(3) Filter isnotnull(pythonUDF0#153)
+- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
+- Scan ExistingRDD[some_value#6]
Однако SortMergeJoin
это , а не то, что вы должны попробоватьдостичь здесь.С постоянным ключом это может привести к экстремальному перекосу данных и, вероятно, к сбою в любых данных, кроме игрушечных.
Декартово произведение, как бы дорого оно ни было, не будет страдать от этой проблемы и должно бытьздесь предпочтительнее.Поэтому рекомендуется включить перекрестные объединения или использовать явный синтаксис перекрестного объединения ( spark.sql.crossJoin.enabled для Spark 2.x ) и двигаться дальше.
Остается нерешенным вопрос о том, как предотвратитьнежелательное поведение при кэшировании данных.К сожалению, у меня нет готового ответа для этого.Я вполне уверен, что можно использовать пользовательские правила оптимизатора, но это не то, что можно сделать только с помощью Python.