Является ли переменная SparkSession поэтапно спарк-оболочкой (scala) в значении val или var? - PullRequest
0 голосов
/ 25 февраля 2020

Я пытаюсь преобразовать свои сценарии Spark Scala (написанные в spark-shell) в Scala Класс, Объект, методы (def) и т. Д. c. поэтому я создаю JAR для spark-submit. Я делаю много звонков, используя Spark SQL, который выполняет много вычислений с отметкой времени относительно часового пояса. Я должен установить следующую конфигурацию явно (поскольку каждый распределенный узел может иметь настроенный часовой пояс по умолчанию), чтобы убедиться, что мой часовой пояс всегда будет UT C для любых последующих манипуляций с меткой времени Spark SQL при любых вызовах функций Spark SQL ( блок кода) в этом методе.

spark.conf.set("spark.sql.session.timeZone", "UTC")

Если в сигнатуру метода входит (spark: org. apache .spark. sql .SparkSession) в качестве параметра, то я всегда могу начать с явного кода заявление для установки часового пояса в UT C в SparkSession без каких-либо шансов (что все распределенные узлы Spark могут иметь или не иметь одинаковые конфигурации часового пояса)?

Следующий вопрос, который у меня возникает, - как мне узнать, является ли переменная "spark", установленная spark-shell, val или var? В поисках ответа на этот вопрос я нашел этот фрагмент кода в надежде выяснить, является ли эта Scala переменная immutable или mutable. Но он не сказал мне, является ли Scala переменная spark var или val. Нужно ли возвращать spark обратно в вызывающую метод после того, как я установил spark.sql.session.timeZone в UTC, потому что я изменил его в своем методе? В настоящее время моя сигнатура метода ожидает два входных параметра (org.apache.spark.sql.SparkSession, org.apache.spark.sql.DataFrame), а выходные данные представляют собой кортеж (org.apache.spark.sql.SparkSession, org.apache.spark.sql.DataFrame).

scala> def manOf[T: Manifest](t: T): Manifest[T] = manifest[T]
manOf: [T](t: T)(implicit evidence$1: Manifest[T])Manifest[T]

scala> manOf(List(1))
res3: Manifest[List[Int]] = scala.collection.immutable.List[Int]

scala> manOf(spark)
res2: Manifest[org.apache.spark.sql.SparkSession] = org.apache.spark.sql.SparkSession

Дополнительный контекст: как часть запуска spark-shell, переменная spark инициализируется следующим образом :

Spark context available as 'sc' (master = yarn, app id = application_1234567890_111111).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_REDACTED)
Type in expressions to have them evaluated.
Type :help for more information.

1 Ответ

1 голос
/ 25 февраля 2020

Спасибо @ Luis Miguel Mejía Suárez за предоставление мне ответов и подсказок в качестве комментариев к моему вопросу. Я реализовал следующий эксперимент, в котором spark является изменяемым объектом, где я просто использовал spark в качестве идентичной ссылки на тот же объект вне метода и внутри метода. Хотя этот нежелательный побочный эффект не является чисто функциональной реализацией, но он избавляет меня от необходимости возвращать объект spark обратно вызывающей стороне для другой последующей обработки. Если у кого-то есть лучшее решение, пожалуйста, поделитесь.

def x(spark: SparkSession, inputDF: DataFrame) = {
  import spark.implicits._
  spark.conf.set("spark.sql.session.timeZone", "UTC") // mutation of the object inside method

  //...spark.sql.functions...
  finalDF
}

Запустил spark-shell и выполнил следующее:

Spark context available as 'sc' (master = yarn, app id = application_1234567890_222222).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_REDACTED)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.conf.get("spark.sql.session.timeZone")
res1: String = America/New_York

scala> :load x.scala
x: (spark: org.apache.spark.sql.SparkSession, inputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

scala> val timeConvertedDF = x(spark, inputDF)
timeConvertedDF: org.apache.spark.sql.DataFrame = [att1: timestamp, att2: string ... 25 more fields]

scala> spark.conf.get("spark.sql.session.timeZone")
res4: String = UTC
...