почему фильтрация с использованием выражения sql лучше по сравнению с функцией, применяемой в DataSet в Spark - PullRequest
0 голосов
/ 27 марта 2020

Я читаю книгу руководств по ультимативному искру, и утверждается, что:

, указав функцию, которую мы принудительно вызываем, чтобы оценить эту функцию в каждой строке в нашем наборе данных ... Для Для простых фильтров всегда предпочтительнее написать sql выражение

Я не понимаю, почему выражение sql будет лучше, поскольку выражение будет также применяться ко всем строкам набора данных !!
Может кто-нибудь дать мне больше деталей?

1 Ответ

2 голосов
/ 27 марта 2020

Используя выражение столбца, оптимизатор Spark имеет возможность оптимизировать запрос, поскольку он может «заглядывать» в фильтр и, возможно, перемещать его в лучшее место, чтобы сократить время выполнения.

Пример :

Изображение, в котором у вас был набор данных, состоящий из двух столбцов id и data, и ваши логики c сначала сгруппировали набор данных по столбцу id и суммировали значения data. После этой операции группировки должна быть сохранена только группа с id = 2. В этом случае было бы быстрее сначала выполнить фильтрацию, а затем суммировать. Применяя фильтр как выражение столбца, Spark может обнаружить эту оптимизацию и сначала применить фильтр:

val dfParquet = spark.read.parquet(<path to data>)
val groupedDf = dfParquet.groupBy("id").sum("data")
val groupedDfWithColumnFilter = groupedDf.filter("id = 2")
val groupedDfWithFilterFunction = groupedDf.filter(_.get(0).equals(2))

Если мы проверим план выполнения groupedDfWithColumnFilter, мы получим

== Physical Plan ==
HashAggregate(keys=[id#0L], functions=[sum(data#1L)])
+- Exchange hashpartitioning(id#0L, 200)
   +- HashAggregate(keys=[id#0L], functions=[partial_sum(data#1L)])
      +- Project [id#0L, data#1L]
         <b>+- Filter (isnotnull(id#0L) && (id#0L = 2))</b>
            +- FileScan parquet [id#0L,data#1L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:.../id], PartitionFilters: [], 
                 <b>PushedFilters: [IsNotNull(id), EqualTo(id,2)]</b>, ReadSchema: struct

Так что Фильтр применяется первым и даже передается в программу чтения файлов паркета.

План выполнения groupedDfWithFilterFunction, однако, показывает, что Spark не может выполнить эту оптимизацию, и применяет фильтр в качестве последнего шага, таким образом, теряя оптимизацию:

== Physical Plan ==
<b>Filter <function1>.apply</b>
+- HashAggregate(keys=[id#0L], functions=[sum(data#1L)])
   +- Exchange hashpartitioning(id#0L, 200)
      +- HashAggregate(keys=[id#0L], functions=[partial_sum(data#1L)])
         +- FileScan parquet [id#0L,data#1L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:.../id], PartitionFilters: [], 
              <b>PushedFilters: []</b>, ReadSchema: struct

Еще один способ увидеть разницу - взглянуть на интерфейс Spark. Для моего теста я создал файл паркета с 10 миллионами строк в 100 разделах. На вкладке SQL видно, что из-за проталкиваемых фильтров для groupedDfWithColumnFilter Spark загружает с диска только около 200К строк данных, тогда как для groupedDfWithFilterFunction Spark необходимо загружать все 10 миллионов строк:

...