Используя выражение столбца, оптимизатор Spark имеет возможность оптимизировать запрос, поскольку он может «заглядывать» в фильтр и, возможно, перемещать его в лучшее место, чтобы сократить время выполнения.
Пример :
Изображение, в котором у вас был набор данных, состоящий из двух столбцов id
и data
, и ваши логики c сначала сгруппировали набор данных по столбцу id
и суммировали значения data
. После этой операции группировки должна быть сохранена только группа с id = 2
. В этом случае было бы быстрее сначала выполнить фильтрацию, а затем суммировать. Применяя фильтр как выражение столбца, Spark может обнаружить эту оптимизацию и сначала применить фильтр:
val dfParquet = spark.read.parquet(<path to data>)
val groupedDf = dfParquet.groupBy("id").sum("data")
val groupedDfWithColumnFilter = groupedDf.filter("id = 2")
val groupedDfWithFilterFunction = groupedDf.filter(_.get(0).equals(2))
Если мы проверим план выполнения groupedDfWithColumnFilter
, мы получим
== Physical Plan ==
HashAggregate(keys=[id#0L], functions=[sum(data#1L)])
+- Exchange hashpartitioning(id#0L, 200)
+- HashAggregate(keys=[id#0L], functions=[partial_sum(data#1L)])
+- Project [id#0L, data#1L]
<b>+- Filter (isnotnull(id#0L) && (id#0L = 2))</b>
+- FileScan parquet [id#0L,data#1L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:.../id], PartitionFilters: [],
<b>PushedFilters: [IsNotNull(id), EqualTo(id,2)]</b>, ReadSchema: struct
Так что Фильтр применяется первым и даже передается в программу чтения файлов паркета.
План выполнения groupedDfWithFilterFunction
, однако, показывает, что Spark не может выполнить эту оптимизацию, и применяет фильтр в качестве последнего шага, таким образом, теряя оптимизацию:
== Physical Plan ==
<b>Filter <function1>.apply</b>
+- HashAggregate(keys=[id#0L], functions=[sum(data#1L)])
+- Exchange hashpartitioning(id#0L, 200)
+- HashAggregate(keys=[id#0L], functions=[partial_sum(data#1L)])
+- FileScan parquet [id#0L,data#1L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:.../id], PartitionFilters: [],
<b>PushedFilters: []</b>, ReadSchema: struct
Еще один способ увидеть разницу - взглянуть на интерфейс Spark. Для моего теста я создал файл паркета с 10 миллионами строк в 100 разделах. На вкладке SQL видно, что из-за проталкиваемых фильтров для groupedDfWithColumnFilter
Spark загружает с диска только около 200К строк данных, тогда как для groupedDfWithFilterFunction
Spark необходимо загружать все 10 миллионов строк: