Почему нажатие на предикат не используется в типизированном API набора данных (в отличие от нетипизированного API DataFrame)? - PullRequest
0 голосов
/ 02 мая 2018

Я всегда думал, что API набора данных / фрейма данных одинаковы ... и единственное отличие состоит в том, что API набора данных обеспечит вам безопасность времени компиляции. Правильно?

Итак .. У меня очень простой случай:

 case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // Let's try to find players born in 1999. 
 // This will work, you have compile time safety... but it will not use predicate pushdown!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // This will work as expected and use predicate pushdown!!!
 // But you can't have compile time safety with this :(
 playersDs.filter('birthYear === 1999).explain()

Объяснение из первого примера покажет, что он НЕ выполняет предикатное нажатие (обратите внимание на пустые PressedFilters):

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

В то время как второй пример будет делать это правильно (обратите внимание на PressedFilters):

== Physical Plan ==
*(1) Project [.....]
+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999))
   +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

Итак, вопрос в том ... как я могу использовать DS Api и обеспечить безопасность времени компиляции ... и предикатное нажатие, работающее так, как ожидалось ????

Возможно ли это? Если нет .. значит ли это, что DS api обеспечивает безопасность времени компиляции ... но за счет производительности! ??? (DF будет намного быстрее в этом случае .. особенно при обработке больших файлов паркета)

1 Ответ

0 голосов
/ 02 мая 2018

Это строка в вашем физическом плане, которую вы должны помнить, чтобы знать реальную разницу между Dataset[T] и DataFrame (то есть Dataset[Row]).

Filter <function1>.apply

Я продолжаю говорить, что людям следует держаться подальше от типизированного API набора данных и продолжать использовать нетипизированный API DataFrame, поскольку код Scala становится черным ящиком для оптимизатора во многих местах. Вы только что нажали одну из них и подумали также о десериализации всех объектов, которые Spark SQL хранит вдали от JVM, чтобы избежать сборщиков мусора. Каждый раз, когда вы касаетесь объектов, вы буквально просите Spark SQL десериализовать объекты и загрузить их в JVM, что создает большую нагрузку на GC (которая будет чаще запускаться с типизированным API набора данных по сравнению с нетипизированным API DataFrame).

См. UDF - это черный ящик - не используйте их, если у вас нет выбора .


Цитата Рейнольд Синь после того, как я задал тот же вопрос в списке рассылки dev@spark.a.o :

UDF - это черный ящик, поэтому Спарк не может знать, с чем имеет дело. Там простые случаи, в которых мы можем проанализировать байт-код UDF и сделать вывод, что это делает, но в целом это довольно сложно сделать.

Для таких случаев есть билет JIRA SPARK-14083 Анализировать байт-код JVM и превращать замыкания в выражения Catalyst , но, как кто-то сказал (я думаю, это был Адам Б. в твиттере), это будет какая-то шутка ожидать этого в ближайшее время.

Одним из больших преимуществ API набора данных является безопасность типов за счет снижения производительности из-за сильной зависимости от заданных пользователем замыканий / лямбд. Эти замыкания обычно медленнее, чем выражения, потому что у нас больше гибкости для оптимизации выражений (известные типы данных, отсутствие вызовов виртуальных функций и т. Д.). Во многих случаях на самом деле не будет очень сложно изучить байт-код этих замыканий и выяснить, что они пытаются сделать. Если мы сможем их понять, то сможем превратить их непосредственно в выражения Catalyst для более оптимизированного выполнения.


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()

Вышеуказанный код эквивалентен следующему:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt - это именно то место, где вы откладываете оптимизации и позволяете Spark Optimizer пропустить его.

...