Обработка 1 миллиарда записей локально в Spark из метастафа Hive (формат паркета) занимает вечно 6 часов. Как ускорить это? - PullRequest
1 голос
/ 22 февраля 2020

Я запускаю все в IntelliJ. Приложение находится в Scala.

scalaVersion: = "2.12.10" sparkVersion = "2.4.4"

Так что я работаю с NY C данные такси. почти 1,1 миллиарда записей, 120 ГБ данных. Я читаю, удаляю ненужные данные, очищаю и записываю в мета-хранилище кустов в разделах (день, месяц, год) в формате паркета со сжатием по умолчанию. Пока все хорошо, работает довольно быстро в течение пары минут (2-3). Теперь я снова читаю данные из метастаза и выполняю некоторые операции. По сути, я хочу вычислить все поездки из manhattanToJFK, чтобы в итоге было несколько UDF и количество. Это занимает вечность.

Тогда у меня есть небольшой набор данных <1MB данных о погоде. почти 50 столбцов, я делаю из этого новый DF только с 7 столбцами. Нужно найти некоторую корреляцию между поездками и погодой et c. Таким образом, объединение даты и затем корреляционная функция spark.ml для получения корреляционной матрицы. Это также занимает целую вечность. </p>

В общей сложности все это занимает 6 часов, чтобы бежать (я был в 4-5 часов, и я что-то испортил и пришлось перезапустить). Я не могу ждать так долго, чтобы проверить, точен ли он или нет.

Из того, что я читаю и понимаю, паркет должен работать значительно быстрее, чем чтение и работа с CSV, но я нахожу это наоборот. Поэтому я думаю, что делаю что-то не так. Может быть, некоторые настройки или настройки et c? Я начинающий с искрой и изучаю это сам. Так что, если я делаю ошибку, будь добр, потерпи меня. Любая помощь будет очень полезна.

Если я должен опубликовать какое-либо обновление или информацию, дайте мне знать. Я могу отредактировать его и опубликовать.

Спасибо

def analysis() = {
    var parquetDF = SparkObject.spark.read.parquet("spark-warehouse/location")
//      .cache()
    val manhattanTojfkDF = countManhattanToJKF(parquetDF)
    findCorrelation(manhattanTojfkDF)
  }

  def countManhattanToJKF(df:DataFrame):DataFrame = {
    var parquetDF = df
    //  val geojson = scala.io.Source.fromURL(this.getClass.getResource("/nyc-borough-boundaries-polygon.geojson")).mkString
    val geojson = scala.io.Source.fromURL(this.getClass.getResource("/NYC Taxi Zones.geojson")).mkString
    val features = geojson.parseJson.convertTo[FeatureCollection]
    val broadcastFeatures = SparkObject.spark.sparkContext.broadcast(features)
    val lonlatToZoneID = (longitude: Double, latitude: Double) => {
      val feature: Option[Feature] = broadcastFeatures.value.find(f => {
        f.geometry.contains(new Point(longitude, latitude))
      })
      feature.map(f => {
        f("location_id").convertTo[String]
      }).getOrElse("NA")
    }
    val latlonToZoneIDUDF = udf(lonlatToZoneID)

    parquetDF = parquetDF.withColumn("pickupZoneID", when(parquetDF("pickupZoneID") === "NA",
      latlonToZoneIDUDF(parquetDF("pickup_longitude"), parquetDF("pickup_latitude")))
      .otherwise(parquetDF("pickup_longitude")))

    parquetDF = parquetDF.withColumn("dropoffZoneID", when(parquetDF("dropoffZoneID") === "NA",
      latlonToZoneIDUDF(parquetDF("dropoff_longitude"), parquetDF("dropoff_latitude")))
      .otherwise(parquetDF("dropoff_longitude")))


    val boroughLookupID = (pickupID:String) => {
      val feature: Option[Feature] = broadcastFeatures.value.find(f => {
        f.properties("location_id").convertTo[String] == pickupID
      })
      feature.map(f => {
        f("borough").convertTo[String]
      }).getOrElse("NA")
    }

    val boroughUDF = udf(boroughLookupID)
    parquetDF = parquetDF.withColumn("pickupBorough", boroughUDF(parquetDF("pickupZoneID")))
    parquetDF = parquetDF.withColumn("dropoffBorough", boroughUDF(parquetDF("dropoffZoneID")))

    val manhattanToJFK = (borough:String, dropOffID:String) => {
      (borough == "Manhattan" && dropOffID == "132")
    }

    val manhattanToJFKUDF = udf(manhattanToJFK)
    parquetDF = parquetDF.withColumn("manhattanToJFK",
      manhattanToJFKUDF(parquetDF("pickupBorough"), parquetDF("dropoffZoneID")))

    val filteredDF =  parquetDF.filter(parquetDF("ManhattanToJFK") === true)
    val totalRidesFromManhattanTOJFK = filteredDF.count()
    println(totalRidesFromManhattanTOJFK)
    print(parquetDF.show())
    filteredDF
  }

  def findCorrelation(filteredDF:DataFrame) = {
    var weatherDF = SparkObject.spark.read.format("csv")
      .option("header", true)
      .load(URLs.weatherData:_*)

    weatherDF = weatherDF.select(weatherDF("DATE").cast("date"), weatherDF("AWND").cast("float"),
      weatherDF("SNOW").cast("float"), weatherDF("SNWD").cast("float"), weatherDF("TMIN").cast("float"),
      weatherDF("TMAX").cast("float"), weatherDF("PRCP").cast("float"))

     val joinedDF = weatherDF.join(filteredDF, weatherDF("DATE") === filteredDF("pickupDate"))
      .select(weatherDF("DATE"), weatherDF("AWND"), weatherDF("SNOW"), weatherDF("SNWD"), weatherDF("TMIN"),
        weatherDF("TMAX"), weatherDF("PRCP"))
    //    .cache()

    val ridesPerDay = joinedDF.groupBy("DATE").agg(count("DATE").alias("rides_per_day"))
    val cleanedDF =  ridesPerDay.join(joinedDF, "DATE").dropDuplicates().drop("DATE")
    cleanedDF.printSchema()

    val assembler = new VectorAssembler()
      .setInputCols(cleanedDF.columns)
      .setOutputCol("features")

    val corrFeatures = assembler.transform(cleanedDF)

    val Row(coeff1: Matrix) = Correlation.corr(corrFeatures, "features").head
    println(s"Pearson correlation matrix:\n $coeff1")

    val Row(coeff2: Matrix) = Correlation.corr(corrFeatures, "features", "spearman").head
    println(s"Spearman correlation matrix:\n $coeff2")
  }

SparkSession выглядит как

lazy val spark = {
    SparkSession
      .builder()
      .master("local[*]")
      .appName("NYCTaxiDataKlarna")
      .getOrCreate()
  }

, и я передаю -Xms4g -Xmx4g как Параметры виртуальной машины, поэтому она составляет 4,1 4,1 ГБ каждый

РЕДАКТИРОВАТЬ: Итак, я сейчас просто запускаю функцию manhatantojfk с небольшим изменением в конце, в основном сохраняя данные для куста. в следующий раз я могу начать оттуда. и он работает уже почти 5 часов и не заканчивается.

val dw = new DataWriter()
      dw.writeToHive(parquetDF, "parquet", "location_with_borough", "pickupDate")
      print(parquetDF.count())

      val filteredDF =  parquetDF.filter(parquetDF("ManhattanToJFK") === true)
      dw.writeToHive(parquetDF, "parquet", "manhattan_to_jfk", "pickupDate")
//      val totalRidesFromManhattanTOJFK = filteredDF.count()
//      println(totalRidesFromManhattanTOJFK)
//      print(parquetDF.show())
//      filteredDF

1 Ответ

0 голосов
/ 22 февраля 2020

Скорее всего, вы не связаны с IO, поэтому не имеет значения, в каком формате вы читаете. Что вам действительно нужно сделать, это найти план запроса (то есть реальную работу, которую выполняет Spark) и расшифровать его. Без плана запроса вы не можете сказать, является ли проблема начальным чтением или объединениями или группами или самим коррелятом. Хорошее первое место для поиска - вкладка sql сервера spark-history. Кроме того, обычно лучше сделать дамп предварительно обработанных данных в хранилище перед выполнением ML (он же cleanedDF), чтобы не выполнять повторную предварительную обработку постоянно.

«Я запускаю все в IntelliJ.»

также, как настроена свеча? Работаете ли вы с master = local [*]?

Edit: Итак, я бы предложил запустить spark-shell и провести некоторое исследование кода в стиле REPL. Если у вас установлен spark на локальном компьютере, запустите его из командной строки с помощью «spark-shell». Когда он запустится, вы увидите, что он распечатал URL-адрес сервера spark-history. Отсюда запустите ваш код и затем откройте сервер истории искр, чтобы выяснить, на что спарк тратит свое время.

ИМО, ваш код выглядит более сложным, чем нужно. Вы можете непреднамеренно выстрелить себе в ногу, и вы не узнаете об этом, пока не погрузитесь немного глубже.

➜  ~ spark-shell 
20/02/23 04:06:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.43.210:4040
Spark context available as 'sc' (master = local[*], app id = local-1582459622890).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

//enter code
...