Как изменить таблицу из SparkSQL на основе DataFrame, не удаляя таблицу кустов, как столбцы удаления / добавления? - PullRequest
2 голосов
/ 11 июля 2019

Я пытаюсь изменить таблицу кустов с помощью Spark, например, добавить столбцы или удалить столбцы из таблицы кустов на основе выходных данных Spark DataFrame.Ниже приведено то, что я пробовал, вроде огромного кода,

    def main(args: Array[String]): Unit = {
    implicit val spark = SparkSession.builder
      .appName("SchemaHandle")
      .enableHiveSupport
      .getOrCreate

    //Assume below is my generated DataFrame
    import spark.implicits._
    val dfSample = Seq(
      (12, "Dallas", "Texas", 55, "BOOK S","hello","Hellotwo"),
      (12, "SF", "CA", 25, "RULER","hello","Hellotwo"),
      (13, "NYC", "NY", 53, "PENCIL S","hello","Hellotwo"),
      (14, "Miami", "Fl", 45, "RULER","hello","Hellotwo"),
      (12, "Houston", "Texas", 75, "MARKER","hello","Hellotwo"),
      (11, "jersey", "NJ", 53, "WHITE NE R","hello","Hellotwo"),
      (19, "new orleans", "LO", 45, "HIGHLIGHTNER","hello","Hellotwo")
    ).toDF("id", "city", "state", "qty", "item","columnone","columntwo")

    try {
      spark.sql("truncate table database.schematest")
      println("Successfully truncated database.schematest")
    } catch {
      case _: Throwable => println("This Job is running for the very first time, so no table to truncate - We'll create the table below")
        dfSample.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(s"database.schematest")
        println("Output Table Saved to database.schematest")
    }

    //Assume this is Spark DF Schema.
    val seqone: Seq[StructField] = dfSample.schema
    //Assume this is Existing Table Schema.
    val seqtwo: Seq[StructField] = spark.table("database.schematest").schema

    //Get Cols- with Schema to be Added
    val diffedSeq = seqone diff seqtwo
    //Get Cols- with Schema to be Dropped
    val diffedSeqTwo = seqtwo diff seqone

    //Get Cols- names to just make the diff
    val seqonecolumns = dfSample.columns

    //Get Cols- names to just make the diff
    val seqtwocolumns = spark.table("dscoewrk_ing_qa.schematest").columns

    val diffedSeqArrayOne = seqonecolumns diff seqtwocolumns

    val diffedSeqArrayTwo = seqtwocolumns diff seqonecolumns

    var fixedAlterColumns: String = ""
    for (i <- diffedSeqArrayOne) {
      for (j <- diffedSeq) {
        if (i.equals(j.name)) {
          fixedAlterColumns +=""+j.name +" "+ datatypeCheckFunction(j.dataType.toString)+","
        }
      }
    }
    if(fixedAlterColumns.length>0) {
      println(s"Result---> ${fixedAlterColumns.substring(0, fixedAlterColumns.length - 1)}")
      //Lets add new columns to table database.schematest.
      spark.sql(s"ALTER TABLE database.schematest ADD COLUMNS (${fixedAlterColumns.substring(0, fixedAlterColumns.length - 1)})")
      println("Alter Table Success")
    }else{
      println("No Columns to Add")
    }

    println("------------------------------BREAK---------------------------")

    //Now lets think about dropping the columns
    val dfSampleCurrentTable:Seq[StructField] = spark.table("dscoewrk_ing_qa.schematest").schema
    //Since we cannot drop columns from Hive Table, lets do REPLACE COLUMNS.
    val dfSampleFinalDiff = dfSampleCurrentTable diff diffedSeqTwo
    dfSampleFinalDiff.foreach(println)
    val dfSampleFinalDiffColArray = (spark.table("database.schematest").columns) diff diffedSeqArrayTwo
    dfSampleFinalDiffColArray.foreach(println)
    var fixedDropColumns:String = ""

    for(i <- dfSampleFinalDiffColArray){
      println("The i is"+i)
      for(j <-dfSampleCurrentTable){
        println("This is j"+j)
        if(i.equals(j.name)){
          fixedDropColumns+=""+j.name +" "+ datatypeCheckFunction(j.dataType.toString)+","
        }
      }
    }
    //Let's drop the columns that aren't required.

    if(fixedDropColumns.length>0) {
      println(s"Result---> ${fixedDropColumns.substring(0, fixedDropColumns.length - 1)}")
      spark.sql(s"ALTER TABLE database.schematest REPLACE COLUMNS(${fixedDropColumns.substring(0,fixedDropColumns.length-1)})")
      println("Alter Drop Table Success")
    }else{
      println("No Columns to Drop")
    }

    //Now let's save the DF to Output in the Table. By using Append as below.
    dfSample.withColumn("mybool",functions.lit(null)).coalesce(50).write.format("parquet").mode(SaveMode.Append).insertInto("database.schematest")

    println("Saving output Table Successful.")

  }

   def datatypeCheckFunction(datatypePassed: String): String = {
    datatypePassed match {
      case "BinaryType" | "ByteType" | "DateType" | "NullType" | "StringType" | "TimestampType" => "String"
      case "BooleanType" => "boolean"
      case "DoubleType" | "FloatType" => "Double"
      case "IntegerType" | "ShortType" => "Int"
      case "LongType" => "BigInt"
      case _ => "String"
    }
  }
}

Я могу понять, что есть место для оптимизации, но по крайней мере для этого кода я вижу две проблемы: 1. Когда я запускаю вышеупомянутое задание Spark, Добавить столбцы успешно, но столбцы ЗАМЕНА терпят неудачу с ниже:

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
Operation not allowed: ALTER TABLE REPLACE COLUMNS(line 1, pos 0)

== SQL ==
ALTER TABLE database.schematest REPLACE COLUMNS(id Int,city String,state String,qty Int,item String,columnone String,columntwo String)

Предположим, что СМЕНИТЬ Столбцы сработало, удалит ли он также данные для этого отбрасываемого столбца?

Вот мой используемый оператор создания таблицы Hive:

create table schematest(`id` int, `city` string, `state` string, `qty` int, `mybool` boolean) stored as parquet

Любая помощьпризнателен, спасибо всем заранее.

1 Ответ

0 голосов
/ 11 июля 2019

Я только что видел этот абзац в Руководстве по кустам :

REPLACE COLUMNS удаляет все существующие столбцы и добавляет новый набор столбцов.Это может быть сделано только для таблиц с собственным SerDe (DynamicSerDe, MetadataTypedColumnsetSerDe, LazySimpleSerDe и ColumnarSerDe)

Кажется, что для паркета, как я понял, не поддерживается.

...