Работа Spark Save занимает много времени - PullRequest
1 голос
/ 08 октября 2019

Я пытаюсь сохранить Dataframe в папку HDFS. Но мое спасение занимает много времени. Действие до этого - объединение двух таблиц с использованием Spark SQL. Нужно знать, почему сохранение имеет четыре этапа и как улучшить производительность. Я прикрепил список этапов здесь Spark UI Изображение задания .

Я также прикрепил фрагменты своего кода.

Искровой код:

Эта функция получает данные из основного класса, а переменная models получает информацию о таблице из XML. Сначала он получает данные для исходной таблицы, а затем пытается извлечь данные из других таблиц соединения.

    def sourceGen(spark: SparkSession,
                minBatchLdNbr: Int,
                maxBatchLdNbr: Int,
                batchLdNbrList: String,
                models: (GModel, TModel, NModel)): Unit = {
    val configJson = models._3
    val gblJson = models._1
    println("Source Loading started")
    val sourceColumns = configJson.transformationJob.sourceDetails.sourceSchema
    val query = new StringBuilder("select ")
    sourceColumns.map { SrcColumn =>
      if (SrcColumn.isKey == "nak") {
        query.append(
          "cast(" + SrcColumn.columnExpression + " as " + SrcColumn.columnDataType + ") as " + SrcColumn.columnName + ",")
      }
    }
    var tableQuery: String =
      if (!configJson.transformationJob.sourceDetails.sourceTableSchemaName.isEmpty) {
        if (!batchLdNbrList.trim.isEmpty)
          query.dropRight(1) + " from " + configJson.transformationJob.sourceDetails.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr + "or batch_ld_nbr in ( " + batchLdNbrList + " )"
        else
          query.dropRight(1) + " from " + configJson.transformationJob.sourceDetails.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr
      } else {
        if (!batchLdNbrList.trim.isEmpty)
          query.dropRight(1) + " from " + gblJson.gParams.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr + "or batch_ld_nbr in ( " + batchLdNbrList + " )"
        else
          query.dropRight(1) + " from " + gblJson.gParams.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr
      }
    if (minBatchLdNbr == 0 && maxBatchLdNbr == 0) {
      tableQuery = tableQuery.split("where")(0)
    }
    println("Time"+LocalDateTime.now());
    val tableQueryDf: DataFrame = spark.sql(tableQuery)
    println("tableQueryDf"+tableQueryDf);
    println("Time"+LocalDateTime.now());
    println("Source Loading ended")
    println("Parent Loading Started")
    val parentColumns = configJson.transformationJob.sourceDetails.parentTables
    val parentSourceJoinDF: DataFrame = if (!parentColumns.isEmpty) {
      parentChildJoin(tableQueryDf,
                      parentColumns,
                      spark,
                      gblJson.gParams.pSchemaName)
    } else {
      tableQueryDf
    }
    println("tableQueryDf"+tableQueryDf);
    println("Parent Loading ended")
    println("Key Column Generation Started")
    println("Time"+LocalDateTime.now());
    val arrOfCustomExprs = sourceColumns
      .filter(_.isKey.toString != "nak")
      .map(
        f =>
          functions
            .expr(f.columnExpression)
            .as(f.columnName)
            .cast(f.columnDataType))
    val colWithExpr = parentSourceJoinDF.columns.map(f =>
      parentSourceJoinDF.col(f)) ++ arrOfCustomExprs
    val finalQueryDF = parentSourceJoinDF.select(colWithExpr: _*)
    println("finalQueryDF"+finalQueryDF);
    println("Time"+LocalDateTime.now());
    keyGenUtils.writeParquetTemp(
      finalQueryDF,
      configJson.transformationJob.globalParams.hdfsInterimPath + configJson.transformationJob.sourceDetails.sourceTableName + "/temp_" + configJson.transformationJob.sourceDetails.sourceTableName
    )
    println("PrintedTime"+LocalDateTime.now());
    println("Key Column Generation Ended")
  }

Приведенный ниже код используется для извлечения данных из объединяемых таблиц.

private def parentChildJoin(tableQueryDf: DataFrame,
                              ptJoin: Array[ParentTables],
                              sparkSession: SparkSession,
                              gParentSchema: String): DataFrame = {
    if (ptJoin.isEmpty) {
      tableQueryDf
    } else {
      val parentJoin = ptJoin.head
      val columns = new StringBuilder("select ")
      for (ptCols <- parentJoin.columns) {
        columns.append(
          ptCols.columnExpression + " as " + ptCols.columnName + ",")
      }
      val statement = columns.dropRight(1)

      if (!parentJoin.pSchemaName.isEmpty) {
        statement.append(
          " from " + parentJoin.pSchemaName + "." + parentJoin.pTableName)
      } else {
        statement.append(" from " + gParentSchema + "." + parentJoin.pTableName)
      }
      println("Time"+LocalDateTime.now());
      println("parentJoin.pTableName"+parentJoin.pTableName);
      val pQueryDF =
        if (parentJoin.pTableName.equalsIgnoreCase("order_summary_si_fact_t")) {
          val ordCalDt = "ord_cal_dt"
          val distinctDates = tableQueryDf
            .selectExpr(ordCalDt)
            .distinct
            .collect
            .map(_.getAs[String](0))
          sparkSession.sql(statement.toString).where(col(ordCalDt).isin(distinctDates: _*)).distinct
        } else {
          sparkSession.sql(statement.toString).distinct
        }
      println("Time"+LocalDateTime.now());
      //val pQueryDF = sparkSession.sql(statement.toString).distinct
      println("statement-"+parentJoin.pTableName+"-"+statement);
      parentChildJoin(
        tableQueryDf.join(pQueryDF,
                          parentJoin.pJoinCondition.map(_.sourceKey).toSeq,
                          parentJoin.joinType),
        ptJoin.tail,
        sparkSession,
        gParentSchema)
    }
  }

Это функция для записи в HDFS.

    def writeParquetTemp(df: DataFrame, hdfsPath: String): Unit = {
    df.write.format("parquet").option("compression", "none").mode(SaveMode.Overwrite).save(hdfsPath)
  }

Sparkотправить конфигурацию:

    /usr/hdp/2.6.3.0-235/spark2/bin//spark-submit --master yarn --deploy-mode client --driver-memory 30G --executor-memory 25G --executor-cores 6 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=774857600 --conf spark.kryoserializer.buffer.max.mb=512 --conf spark.dynamicAllocation.maxExecutors=40 --conf spark.eventLog.enabled=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.parquet.binaryAsString=true  --conf spark.sql.broadcastTimeout=36000 --conf spark.sql.shuffle.partitions=500
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...