TL; DR Пока вы используете стандартную сборку с открытым исходным кодом без пользовательского оптимизатора Rules
, вы можете предположить, что каждая операция DSL вызывает логический подзапрос, и все логические оптимизации соответствуют SQL: 2003 стандарт. Другими словами, ваш SQL должен применяться здесь.
Внутренне Spark представляет SQL-запросы в виде дерева LogicalPlans
, где каждый оператор соответствует отдельному узлу со своими входными данными в качестве дочерних.
В результате неоптимизированный логический план, соответствующий выражению DSL, состоит из вложенного узла для каждого оператора (проекция, выбор, упорядочение, агрегация с группировкой или без нее). Итак, с учетом таблицы
from pyspark.sql.functions import col, desc
t0 = spark.createDataFrame(
[], "`End Date DT` timestamp, `Subscriber Type` string"
)
t0.createOrReplaceTempView("t0")
первый запрос
(t0.alias("t0")
.filter(col("Subscriber Type") == "Subscriber").alias("t1")
.orderBy(desc("End Date DT")).alias("t2")
.groupBy("End Date DT")
.count())
примерно эквивалентно
SELECT `End Date DT`, COUNT(*) AS count FROM (
SELECT * FROM (
SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
) as t1 ORDER BY `End Date DT` DESC
) as t2 GROUP BY `End Date DT`
, а
(t0.alias("t0")
.filter(col("Subscriber Type") == "Subscriber").alias("t1")
.groupBy("End Date DT")
.count().alias("t2")
.orderBy(desc("End Date DT")))
примерно соответствует *
SELECT * FROM (
SELECT `End Date DT`, COUNT(*) AS count FROM (
SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
) as t1 GROUP BY `End Date DT`
) as t2 ORDER BY `End Date DT` DESC
Очевидно, что оба запроса не эквивалентны, и это отражено в их оптимизированных планах выполнения.
ORDER BY
до GROUP BY
соответствует
== Optimized Logical Plan ==
Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#70L]
+- Sort [End Date DT#38 DESC NULLS LAST], true
+- Project [End Date DT#38]
+- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
, в то время как ORDER BY
после GROUP BY
соответствует
== Optimized Logical Plan ==
Sort [End Date DT#38 DESC NULLS LAST], true
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#84L]
+- Project [End Date DT#38]
+- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Так почему же они могут дать одинаковый конечный результат? Это связано с тем, что в таких основных случаях, как здесь, планировщик запросов будет рассматривать предшествующий ORDER BY
как подсказку для применения разбиения диапазона вместо хеш-разбиения. Поэтому физический план для ORDER BY
, за которым следует GROUP BY
, будет
== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- *(2) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
+- *(2) Sort [End Date DT#38 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(End Date DT#38 DESC NULLS LAST, 200)
+- *(1) Project [End Date DT#38]
+- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]
без ORDER BY
*** по умолчанию будет хеш-секционирование
== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- Exchange hashpartitioning(End Date DT#38, 200)
+- *(1) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
+- *(1) Project [End Date DT#38]
+- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]
Поскольку это происходит на этапе планирования, который является важной точкой расширения (особенно для поставщиков источников данных), я бы рассмотрел это как деталь реализации, и не буду зависеть от этого поведения для правильности.
* С разобранным логическим планом для варианта DSL
== Parsed Logical Plan ==
'Aggregate ['End Date DT], [unresolvedalias('End Date DT, None), count(1) AS count#45L]
+- SubqueryAlias `t2`
+- Sort [End Date DT#38 DESC NULLS LAST], true
+- SubqueryAlias `t1`
+- Filter (Subscriber Type#39 = Subscriber)
+- SubqueryAlias `t0`
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
и для варианта SQL
== Parsed Logical Plan ==
'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#50]
+- 'SubqueryAlias `t2`
+- 'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
+- 'SubqueryAlias `t1`
+- 'Project [*]
+- 'Filter ('Subscriber Type = Subscriber)
+- 'UnresolvedRelation `t0`
** С разобранным логическим планом для варианта DSL
== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- SubqueryAlias `t2`
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#59L]
+- SubqueryAlias `t1`
+- Filter (Subscriber Type#39 = Subscriber)
+- SubqueryAlias `t0`
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
и для варианта SQL
== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
+- 'SubqueryAlias `t2`
+- 'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#64]
+- 'SubqueryAlias `t1`
+- 'Project [*]
+- 'Filter ('Subscriber Type = Subscriber)
+- 'UnresolvedRelation `t0`
***, т. Е.
(t0.alias("t0")
.filter(col("Subscriber Type") == "Subscriber").alias("t1")
.groupBy("End Date DT")
.count()).explain()