Эффективное выполнение на фреймах данных PySpark / Delta - PullRequest
0 голосов
/ 01 ноября 2019

Используя pyspark / Delta lakes на Databricks, у меня есть следующий сценарий:

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)

analysis_1 = result.groupBy(...).count() # transformation performed here
analysis_2 = result.groupBy(...).count() # transformation performed here

Как я понимаю, Spark с Delta lakes из-за цепного выполнения, result фактически не вычисляется при объявлении, носкорее, когда он используется.

Однако в этом примере он используется несколько раз, и, следовательно, самое дорогое преобразование вычисляется несколько раз.

Возможно ли принудительное выполнение в какой-то моментв коде, например

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
result.force() # transformation performed here??

analysis_1 = result.groupBy(...).count() # quick smaller transformation??
analysis_2 = result.groupBy(...).count() # quick smaller transformation??

1 Ответ

0 голосов
/ 02 ноября 2019

Вопрос, на мой взгляд, повсеместен или неясен. Но если вы новичок в Spark, то это может иметь место.

Итак:

Для использования .force см. https://blog.knoldus.com/getting-lazy-with-scala/ .forceне будет работать с Dataset или Dataframe.

Это как-то связано с подходом pyspark или Delta Lake? Нет, нет.

analysis_1 = result.groupBy(...).count() # quick smaller transformation?? 
  • На самом деле это Действие с Преобразованиями, которое, скорее всего, ведет к перетасовке.

Итак, я думаю, что вы имеете в виду как наш уважаемый ударговорится следующее:

  • .cache или .persist

Я подозреваю, что вам понадобится:

result.cache 

Это будет означать ваше 2nd Action analysis_2 не нужно будет заново вычислять весь путь до источника, показанного здесь

(2) Spark Jobs
Job 16 View(Stages: 3/3)
Stage 43: 
8/8
succeeded / total tasks 
Stage 44: 
200/200
succeeded / total tasks 
Stage 45:   
1/1
succeeded / total tasks 
Job 17 View(Stages: 2/2, 1 skipped)
Stage 46: 
0/8
succeeded / total tasks skipped
Stage 47: 
200/200
succeeded / total tasks 
Stage 48:   
1/1
succeeded / total tasks 

С улучшениями, внесенными в Spark, разделы в случайном порядке сохраняются, что в некоторых случаях также приводит к пропущенным этапами, в частности, для СДР. Для фреймов данных требуется кэширование, чтобы получить эффект пропущенных этапов, который я наблюдаю.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...