Искровой лексический порядок операций - PullRequest
3 голосов
/ 18 июня 2019

Мы все знаем, что в SQL, как правило, у нас есть определенный порядок лексических операций при написании кода:

SELECT ...
FROM ...
JOIN ...
WHERE ...
GROUP BY ...
HAVING ...
ORDER BY ...

Как это проявляется в Spark?Я знаю, что все дело в атрибутах конкретных объектов, поэтому, если я могу задать вопрос по-другому - какой полезный способ подумать о лексическом порядке операций при написании приложений Spark для людей, пришедших из SQL?

Чтобы проиллюстрировать мою путаницу.Вот два фрагмента кода из моих тестов, в которых я поместил orderBy в двух совершенно разных местах (опять же, исходя из фона SQL), но код дал точно такие же результаты:

tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.orderBy(desc("End Date DT"))\
.groupBy("End Date DT")\
.count()\
.show()


tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.groupBy("End Date DT")\
.count()\
.orderBy(desc("End Date DT"))\
.show()

Тем не менее,В других случаях я полностью испортил свой код из-за неправильного лексического порядка операций.

1 Ответ

4 голосов
/ 24 июня 2019

TL; DR Пока вы используете стандартную сборку с открытым исходным кодом без пользовательского оптимизатора Rules, вы можете предположить, что каждая операция DSL вызывает логический подзапрос, и все логические оптимизации соответствуют SQL: 2003 стандарт. Другими словами, ваш SQL должен применяться здесь.

Внутренне Spark представляет SQL-запросы в виде дерева LogicalPlans, где каждый оператор соответствует отдельному узлу со своими входными данными в качестве дочерних.

В результате неоптимизированный логический план, соответствующий выражению DSL, состоит из вложенного узла для каждого оператора (проекция, выбор, упорядочение, агрегация с группировкой или без нее). Итак, с учетом таблицы

from pyspark.sql.functions import col, desc

t0 = spark.createDataFrame(
    [], "`End Date DT` timestamp, `Subscriber Type` string"
)
t0.createOrReplaceTempView("t0")

первый запрос

(t0.alias("t0")
    .filter(col("Subscriber Type") == "Subscriber").alias("t1")
    .orderBy(desc("End Date DT")).alias("t2")
    .groupBy("End Date DT")
    .count())

примерно эквивалентно

SELECT `End Date DT`, COUNT(*)  AS count FROM (
    SELECT * FROM (
        SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
    ) as t1 ORDER BY `End Date DT` DESC
) as t2 GROUP BY `End Date DT`

, а

(t0.alias("t0")
    .filter(col("Subscriber Type") == "Subscriber").alias("t1")
    .groupBy("End Date DT")
    .count().alias("t2")
    .orderBy(desc("End Date DT")))

примерно соответствует *

SELECT * FROM (
    SELECT `End Date DT`, COUNT(*) AS count FROM (
        SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
    ) as t1 GROUP BY `End Date DT`
) as t2 ORDER BY `End Date DT` DESC

Очевидно, что оба запроса не эквивалентны, и это отражено в их оптимизированных планах выполнения.

ORDER BY до GROUP BY соответствует

== Optimized Logical Plan ==
Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#70L]
+- Sort [End Date DT#38 DESC NULLS LAST], true
   +- Project [End Date DT#38]
      +- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
         +- LogicalRDD [End Date DT#38, Subscriber Type#39], false

, в то время как ORDER BY после GROUP BY соответствует

== Optimized Logical Plan ==
Sort [End Date DT#38 DESC NULLS LAST], true
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#84L]
   +- Project [End Date DT#38]
      +- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
         +- LogicalRDD [End Date DT#38, Subscriber Type#39], false

Так почему же они могут дать одинаковый конечный результат? Это связано с тем, что в таких основных случаях, как здесь, планировщик запросов будет рассматривать предшествующий ORDER BY как подсказку для применения разбиения диапазона вместо хеш-разбиения. Поэтому физический план для ORDER BY, за которым следует GROUP BY, будет

== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- *(2) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
   +- *(2) Sort [End Date DT#38 DESC NULLS LAST], true, 0
      +- Exchange rangepartitioning(End Date DT#38 DESC NULLS LAST, 200)
         +- *(1) Project [End Date DT#38]
            +- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
               +- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]

без ORDER BY*** по умолчанию будет хеш-секционирование

== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- Exchange hashpartitioning(End Date DT#38, 200)
   +- *(1) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
      +- *(1) Project [End Date DT#38]
         +- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
            +- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]

Поскольку это происходит на этапе планирования, который является важной точкой расширения (особенно для поставщиков источников данных), я бы рассмотрел это как деталь реализации, и не буду зависеть от этого поведения для правильности.


* С разобранным логическим планом для варианта DSL

== Parsed Logical Plan ==
'Aggregate ['End Date DT], [unresolvedalias('End Date DT, None), count(1) AS count#45L]
+- SubqueryAlias `t2`
   +- Sort [End Date DT#38 DESC NULLS LAST], true
      +- SubqueryAlias `t1`
         +- Filter (Subscriber Type#39 = Subscriber)
            +- SubqueryAlias `t0`
               +- LogicalRDD [End Date DT#38, Subscriber Type#39], false

и для варианта SQL

== Parsed Logical Plan ==
'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#50]
+- 'SubqueryAlias `t2`
   +- 'Sort ['End Date DT DESC NULLS LAST], true
      +- 'Project [*]
         +- 'SubqueryAlias `t1`
            +- 'Project [*]
               +- 'Filter ('Subscriber Type = Subscriber)
                  +- 'UnresolvedRelation `t0`

** С разобранным логическим планом для варианта DSL

== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- SubqueryAlias `t2`
   +- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#59L]
      +- SubqueryAlias `t1`
         +- Filter (Subscriber Type#39 = Subscriber)
            +- SubqueryAlias `t0`
               +- LogicalRDD [End Date DT#38, Subscriber Type#39], false

и для варианта SQL

== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
   +- 'SubqueryAlias `t2`
      +- 'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#64]
         +- 'SubqueryAlias `t1`
            +- 'Project [*]
               +- 'Filter ('Subscriber Type = Subscriber)
                  +- 'UnresolvedRelation `t0`

***, т. Е.

(t0.alias("t0") 
    .filter(col("Subscriber Type") == "Subscriber").alias("t1") 
    .groupBy("End Date DT") 
    .count()).explain()   
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...