val против def производительности на Spark Dataframe - PullRequest
0 голосов
/ 25 февраля 2019

Следующий код и, следовательно, вопрос о производительности - представьте, конечно, в масштабе:

import org.apache.spark.sql.types.StructType

val df = sc.parallelize(Seq(
   ("r1", 1, 1),
   ("r2", 6, 4),
   ("r3", 4, 1),
   ("r4", 1, 2)
   )).toDF("ID", "a", "b")

val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

// or

def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

df.withColumn("ones", ones).explain

Здесь под двумя физическими планами для использования def и val - которые одинаковы:

 == Physical Plan == **def**
 *(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#762 = 1) THEN 1 ELSE 0 END) AS ones#770]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#760, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#761, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#762]
   +- Scan[obj#759]


 == Physical Plan == **val**
 *(1) Project [_1#780 AS ID#784, _2#781 AS a#785, _3#782 AS b#786, (CASE WHEN (_2#781 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#782 = 1) THEN 1 ELSE 0 END) AS ones#790]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#780, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#781, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#782]
    +- Scan[obj#779] 

Итак, есть обсуждение:

val против def performance.

Затем:

  • Iне вижу разницы в .explains.ОК.

  • Из других мест: val вычисляется при определении, def - при вызове.

  • Я предполагаю, что не имеет значения, используется ли здесь значение val или def, поскольку оно, по существу, внутри цикла и происходит уменьшение.Это правильно?
  • Будет ли выполнено df.schema.map (c => c.name) .drop (1) для каждой строки данных?Там, конечно, нет необходимости.Оптимизирует ли это Catalyst?
  • Если вышеприведенное верно в том смысле, что оператор выполняется каждый раз для обработки столбцов, как мы можем заставить этот фрагмент кода появляться только один раз?Должны ли мы сделать значение val ones = df.schema.map (c => c.name) .drop (1)
  • val, def большечем Scala, также компонент Spark.

Для -1er я спрашиваю так, поскольку следующее очень ясно, но в val есть больше, чем в приведенном ниже коде, и ниже не повторяется:

var x = 2 // using var as I need to change it to 3 later
val sq = x*x // evaluates right now
x = 3 // no effect! sq is already evaluated
println(sq)

Ответы [ 2 ]

0 голосов
/ 25 февраля 2019

Здесь есть две основные концепции: создание и оценка Spark DAG и определения val против def от Scala, они ортогональны

Я не вижу никакой разницы в .explains

Вы не видите никакой разницы, потому что с точки зрения Spark запрос такой же.Для анализатора не имеет значения, сохраняете ли вы график в val или создаете его каждый раз с помощью def.

Из другого места: val вычисляется, когда определено, def - когда вызывается.

Это семантика Scala.val является неизменной ссылкой, которая оценивается один раз на сайте декларации.def обозначает определение метода, и если вы выделите новый DataFrame внутри него, он будет создавать его каждый раз, когда вы вызываете его.Например:

def ones = 
  df
   .schema
   .map(c => c.name)
   .drop(1)
   .map(x => when(col(x) === 1, 1).otherwise(0))
   .reduce(_ + _)

val firstcall = ones
val secondCall = ones

Приведенный выше код создаст две отдельные группы доступности базы данных поверх DF.

Я предполагаю, что не имеет значения, используется ли здесь значение val или def какэто по существу внутри цикла, и есть уменьшение.Это правильно?

Я не уверен, о какой петле вы говорите, но посмотрите на мой ответ выше для различия между ними.

Will df.schema.map (c => c.name) .drop (1) будет выполняться для каждой строки данных?Там, конечно, нет необходимости.Оптимизирует ли это Catalyst?

Нет, drop(1) произойдет для всего фрейма данных, что по существу заставит его отбросить только первую строку.

Если вышеверно то, что оператор выполняется каждый раз для обработки столбцов, как мы можем заставить этот фрагмент кода появляться только один раз?Должны ли мы сделать значение val val = df.schema.map (c => c.name) .drop (1)

Это происходит только один раз для каждого фрейма данных (что в нашем примере мыточно один из).

0 голосов
/ 25 февраля 2019

Выражение ones не будет оцениваться для каждой строки данных, оно будет вычислено один раз.def get оценивается за вызов.Например, если есть 3 dataframe s, использующие это выражение ones, то выражение ones будет оцениваться 3 раза.Разница между val заключается в том, что выражение будет оцениваться только один раз.

По сути, выражение ones создает экземпляр org.apache.spark.sql.Column, где org.apache.spark.sql.Column = (CASE WHEN (a = 1) THEN 1 ELSE 0 END + CASE WHEN (b = 1) THEN 1 ELSE 0 END).Если выражение является def, то каждый раз при его вызове создается новый org.apache.spark.sql.Column.Если выражение val, то один и тот же экземпляр используется снова и снова.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...