производительность spark Join на кластере EMR - PullRequest
0 голосов
/ 14 мая 2018

У нас есть 3-узловый искровой EMR-кластер (m3Xlarge). Мы пытаемся объединить несколько больших таблиц размером 4 ГБ (250+ столбцов), а несколько небольших ссылочных таблиц (15) имеют 2-3 столбца в каждой. Поскольку мы используем искровое динамическое расположение, которое по умолчанию включено в EMR.

Таким образом, при записи в HDFS для сохранения результатов требуется более 1 часа (это потому, что мы используем coalesce (1) в конечном DataFrame).

Даже мы пытались использовать широковещательные соединения, но пока не повезло. Как мы можем улучшить производительность для вышеупомянутого?

Каково будет окончательное время оптимизации для вышеуказанного Процесса?

Какими могут быть возможные способы улучшения производительности?

Любая помощь будет оценена!

Вот моя функция соединения

def multiJoins(MasterTablesDF: DataFrame, tmpReferenceTablesDF_List: MutableList[DataFrame], tmpReferenceTableJoinDetailsList: MutableList[Array[String]], DrivingTable: String): DataFrame = {

// Define final output of Driving Table
var final_df: DataFrame = null

if (MasterTablesDF != null) {

  if (!MasterTablesDF.head(1).isEmpty && tmpReferenceTablesDF_List.length >= 1) {

    for (i <- 0 until tmpReferenceTablesDF_List.length) {

      val eachReferenceTableDF = tmpReferenceTablesDF_List(i)
      var eachJoinDetails = tmpReferenceTableJoinDetailsList(i)

      //for first ref table Join
      if (i == 0) {
        println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        if (eachJoinDetails(0).equals(eachJoinDetails(1))) {
          println("############## Driving table and Ref table Joining columns are same joining first Drive table ==>" + DrivingTable + "With Ref table ==>" + eachJoinDetails(3))
          //if reftable and Driving table have same join columns using seq() to remove duplicate columns after Joins
          final_df = MasterTablesDF.join(broadcast(eachReferenceTableDF), Seq(eachJoinDetails(0)), eachJoinDetails(2)) //.select(ReqCols.head, ReqCols.tail: _*)
        } else {
          //if the joining column names of the driving and ref tables are not same then
          //using  driving table join col and reftable join cols
          println("############### Driving table and Ref table joining columns are not same joining first Drive table ==>" + DrivingTable + "With Ref table ==>" + eachJoinDetails(3) + "\n")
          final_df = MasterTablesDF.join(broadcast(eachReferenceTableDF), MasterTablesDF(eachJoinDetails(0)) === eachReferenceTableDF(eachJoinDetails(1)), eachJoinDetails(2))

        }

      } //Joining Next reference table dataframes with final DF
      else {
        if (eachJoinDetails(0).equals(eachJoinDetails(1))) {
          println("###### drive table and another ref table join cols are same joining driving table ==>" + DrivingTable + "With RefTable" + eachJoinDetails(3))
          final_df = final_df.join(broadcast(eachReferenceTableDF), Seq(eachJoinDetails(0)), eachJoinDetails(2)) //.select(ReqCols.head, ReqCols.tail: _*)
          // final_df.unpersist()
        } else {
          println("######  drive table and another ref table join cols are not same joining driving table ==>" + DrivingTable + "With RefTable" + eachJoinDetails(3) + "\n")
          final_df = final_df.join(broadcast(eachReferenceTableDF), MasterTablesDF(eachJoinDetails(0)) === eachReferenceTableDF(eachJoinDetails(1)), eachJoinDetails(2))

        }
      }
    }

  }
}

return final_df

//Writing is too slow
//final_df.coalesce(1).write.format("com.databricks.spark.csv").option("delimiter", "|").option("header", "true")
      .csv(hdfsPath)

}

Это нормально? это из-за зацикливания?

1 Ответ

0 голосов
/ 14 мая 2018

Вероятно, Spark не может оптимизировать ваш очень длинный план выполнения настолько хорошо, насколько это возможно. У меня была такая же ситуация, и мы провели серию оптимизаций: 1) удалите все ненужные столбцы и отфильтруйте как можно скорее 2) «Материализовать» некоторые таблицы перед объединением, это поможет Spark нарушить происхождение и каким-то образом оптимизировать ваш поток (в нашем примере 2 sortJoins были заменены широковещательными соединениями, потому что Spark понял, что кадры данных очень малы) 3) Мы разбили все наборы данных по одному ключу и количеству разделов (сразу после прочтения). и некоторые другие оптимизации. Это сократило время работы с 45 минут до 4. Вам нужно внимательно посмотреть на Spark UI, там мы нашли много полезных идей для оптимизации (один из наших исполнителей высказался вместо 10, потому что все данные были разбиты на одна часть ..) и т.д .. Удачи!

...