Улучшено повторное использование кода Aws Glue, и пользователь не инициализировал контекст искры - PullRequest
0 голосов
/ 02 февраля 2020

Чтобы улучшить возможность повторного использования кода, я не хочу создавать объект GlueContext в основном методе и избегать добавления параметра glueContext к нескольким параметрам метода. Предыдущий код выглядит следующим образом:

import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

object WorldCount {

  def main(args: Array[String]): Unit = {
    val glueContext = new GlueContext(new SparkContext())
    val spark       = glueContext.getSparkSession
    func1(glueContext)
    func2(spark)
  }

  def func1(glueContext: GlueContext): Unit = ???

  def func2(spark: SparkSession): Unit = {
    import spark.implicits._
    // todo
  }
}
  1. Я не хочу каждый раз писать код для создания объекта glueContext, а затем передавать объект glueContext в методе

Поэтому я создал черту GlueJob со следующим кодом:

import com.amazonaws.services.glue.util.{GlueArgParser, JsonOptions}
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

trait GlueJob {

  lazy val glueContext: GlueContext  = new GlueContext(new SparkContext())
  lazy val spark      : SparkSession = glueContext.getSparkSession

  def argValues(args: Array[String], params: Array[String]): Array[String] = {
    val m = GlueArgParser.getResolvedOptions(args, params)
    params.map(m(_))
  }

  /**
    * Get DynamicFrame from glue catalog
    */
  def get(database: String, tableName: String,
          redshiftTmpDir: Option[String] = None,
          pushDownPredicate: Option[String] = None): DynamicFrame =
    (redshiftTmpDir, pushDownPredicate) match {
      case (Some(x), Some(y)) =>
        glueContext.getCatalogSource(
          database = database,
          tableName = tableName,
          redshiftTmpDir = x,
          pushDownPredicate = y).getDynamicFrame()
      case (Some(x), None)    =>
        glueContext.getCatalogSource(
          database = database,
          tableName = tableName,
          redshiftTmpDir = x).getDynamicFrame()
      case (None, Some(y))    =>
        glueContext.getCatalogSource(
          database = database,
          tableName = tableName,
          pushDownPredicate = y).getDynamicFrame()
      case (None, None)       =>
        glueContext.getCatalogSource(
          database = database,
          tableName = tableName).getDynamicFrame()
    }

  def readJson(path: String): DynamicFrame =
    read(List(path), "json")

  private def read(paths: List[String], format: String): DynamicFrame =
    glueContext.getSourceWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map(
        "paths" -> paths
        )),
      format = format
      ).getDynamicFrame()

  def writeOrc(df: DataFrame, path: String,
               partitionKeys: Seq[String] = Seq.empty): Unit =
    write(df, path, "orc", partitionKeys)

  def writeParquet(df: DataFrame, path: String,
                   partitionKeys: Seq[String] = Seq.empty): Unit =
    write(df, path, "parquet", partitionKeys)

  def writeJson(df: DataFrame, path: String,
                partitionKeys: Seq[String] = Seq.empty): Unit =
    write(df, path, "json", partitionKeys)

  private def write(df: DataFrame, path: String,
                    format: String, partitionKeys: Seq[String]): Unit = {
    val options = if (partitionKeys.isEmpty)
                    Map("path" -> path)
                  else
                    Map("path" -> path, "partitionKeys" -> partitionKeys)

    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(options),
      format = format
      ).writeDynamicFrame(DynamicFrame(df, glueContext))
  }
}

Таким образом, мне не нужно создавать объект glueContext в методе main, а также не нужно передавать объект glueContext или объект sparkSession и импортировать spark.implicits ._ несколько раз в каждом методе

object WorldCount extends GlueJob {

  def main(args: Array[String]): Unit = {
    func1()
    func2()
  }

  def func1(): Unit = {
    // glueContext.getCatalogSource()
  }

  def func2(): Unit = {
    // import spark.implicits._
    // todo
  }
}

Но когда я тестировал, я столкнулся со странной проблемой. Ниже приведен код теста.

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.starlight.data.glue.{GlueJob, S3Utils}
import org.apache.spark.sql.functions._

object Test extends GlueJob {

  import spark.implicits._

  val partitionKeys = "year" :: "month" :: "day" :: Nil
  val params        = Array("diff_days", "input_bucket_name", "input_path", "output_bucket_name", "output_path")

  def main(args: Array[String]) {
    val Array(diffDays, inputBucketName,
              inputPath, outputBucketName,
              outputPath) = this.argValues(args, params)

    //Get data time
    val getDataDate = (diff_days: Int) => {
      val rightNow: Calendar = Calendar.getInstance()
      rightNow.setTime(new Date())
      rightNow.add(Calendar.DAY_OF_YEAR, diff_days * (-1))
      val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
      dateFormat.format(rightNow.getTime)
                .split("-").toList
    }

    for (d <- 0 to diffDays.toInt) {
      val year :: month :: day :: _ = getDataDate(d)

      S3Utils.deleteFolder(outputBucketName, s"${outputPath}year=$year/month=$month/day=$day/")

      //Select all files in the TXT directory for parsing
      val inputFilePrefix  = s"$inputPath$year/$month/$day/TXT"
      val allInputFileName = S3Utils.listObjects(inputBucketName, inputFilePrefix)

      if (allInputFileName.nonEmpty) {
        val jsonDF = this.readJson(s"s3://$inputBucketName/$inputFilePrefix").toDF()
        var dataDF = jsonDF.withColumn("dataList", explode($"data"))

        dataDF.schema.fields.foreach(f => {
          f.dataType.typeName.toLowerCase match {
            case "null" | "array" => dataDF = dataDF.withColumn(f.name, dataDF(f.name).cast("string"))
            case _                =>
          }
        })

        this.writeOrc(dataDF, s"s3://$outputBucketName/$outputPath", partitionKeys)
      }
    }

  }

}

журналы ошибок:

java.lang.IllegalStateException: User did not initialize spark context!
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:485)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Continuous Logging: Shutting down cloudwatch appender.
...