Обнаружен декартово произведение для соединения INNER в буквальном столбце в PySpark - PullRequest
0 голосов
/ 23 ноября 2018

Следующий код вызывает исключение «Обнаружено декартово произведение для соединения INNER»:

first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
second_df = spark.createDataFrame([{"some_value": "????"}, ])

second_df = second_df.withColumn("second_id", F.lit("1"))

# If the next line is uncommented, then the JOIN is working fine.
# second_df.persist()

result_df = first_df.join(second_df,
                          first_df.first_id == second_df.second_id,
                          'inner')
data = result_df.collect()

result_df.explain()

и показывает, что логический план такой, как показано ниже:

Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

Это выглядит такпо той причине, что в условии JOIN для этих логических планов не существует столбца, когда RuleExecutor применяет набор правил оптимизации под названием CheckCartesianProducts (см. https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).

Но, если я использую метод «persist» перед JOIN, он работает иФизический план:

*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(first_id#0, 10)
:     +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#4, 10)
      +- InMemoryTableScan [some_value#2, second_id#4]
            +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
                  +- *(1) Project [some_value#2, 1 AS second_id#4]
                     +- Scan ExistingRDD[some_value#2]

Итак, может быть, кто-то может объяснить внутреннее ведение к таким результатам, потому что сохранение фрейма данных не выглядит как решение.

1 Ответ

0 голосов
/ 23 ноября 2018

Проблема в том, что после сохранения ваших данных second_id включается в кэшированную таблицу и больше не считается постоянной.В результате планировщик больше не может сделать вывод, что запрос должен быть выражен декартовым произведением, и использует стандарт SortMergeJoin для хеш-секционирования second_id.

Было бы тривиально достичь того же результата без настойчивости,используя udf

from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

@pandas_udf('integer', PandasUDFType.SCALAR)                   
def identity(x):                                                   
    return x    

second_df = second_df.withColumn('second_id', identity(lit(1)))

result_df = first_df.join(second_df,
                         first_df.first_id == second_df.second_id,
                         'inner')

result_df.explain()

== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
:     +- *(1) Filter isnotnull(first_id#4)
:        +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#129, 200)
      +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
         +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
            +- *(3) Project [some_value#6]
               +- *(3) Filter isnotnull(pythonUDF0#153)
                  +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                     +- Scan ExistingRDD[some_value#6]

Однако SortMergeJoin это , а не то, что вы должны попробоватьдостичь здесь.С постоянным ключом это может привести к экстремальному перекосу данных и, вероятно, к сбою в любых данных, кроме игрушечных.

Декартово произведение, как бы дорого оно ни было, не будет страдать от этой проблемы и должно бытьздесь предпочтительнее.Поэтому рекомендуется включить перекрестные объединения или использовать явный синтаксис перекрестного объединения ( spark.sql.crossJoin.enabled для Spark 2.x ) и двигаться дальше.

Остается нерешенным вопрос о том, как предотвратитьнежелательное поведение при кэшировании данных.К сожалению, у меня нет готового ответа для этого.Я вполне уверен, что можно использовать пользовательские правила оптимизатора, но это не то, что можно сделать только с помощью Python.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...