Искра MergeSchema на паркетных колоннах - PullRequest
1 голос
/ 17 апреля 2020

Для эволюции схемы Mergeschema может использоваться в Spark для форматов файлов Parquet, и у меня есть следующие пояснения по этому вопросу

Поддерживает ли это только формат файлов Parquet или любые другие форматы файлов, такие как csv, txt.

Если между ними будут добавлены новые дополнительные столбцы, я понимаю, Mergeschema переместит столбцы до последнего.

И если порядок столбцов нарушается, будет ли Mergeschema выравнивать столбцы по правильному порядку при его создании, или нам нужно сделать это вручную, выбрав все столбцы.

Обновление из комментария: например, если у меня есть схема, как показано ниже, и создать таблицу, как показано ниже - spark.sql("CREATE TABLE emp USING DELTA LOCATION '****'") empid,empname,salary====> 001,ABC,10000 и на следующий день, если я получу формат ниже empid,empage,empdept,empname,salary====> 001,30,XYZ,ABC,10000.

Будут ли добавлены новые столбцы - empage, empdept после empid,empname,salary columns?

1 Ответ

2 голосов
/ 17 апреля 2020

Q: 1. Поддерживает ли это только формат файла Parquet или любые другие форматы файлов, такие как CSV, TXT-файлы. 2. Если порядок столбцов нарушается, будет ли Mergeschema выравнивать столбцы в правильном порядке, когда был создан или мы должны сделать это вручную, выбрав все столбцы


AFAIK Схема слияния поддерживается только паркетом, а не другим форматом, таким как csv, txt.

Mergeschema (spark.sql.parquet.mergeSchema) выровняет столбцы в правильном порядке, даже если они распределены.

Пример из документации искры по объединение схемы паркета :

import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

ОБНОВЛЕНИЕ: Реальный пример, данный вами в поле для комментариев ...


В: Будут ли добавлены новые столбцы - empage, empdept после empid,empname,salary columns?


Ответ: Да EMPAGE, EMPDEPT были добавлены после EMPID, EMPNAME, SALARY с указанием столбца вашего дня.

см. Полный пример.

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SaveMode


object CSVDataSourceParquetSchemaMerge extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)

  import org.apache.spark.sql.SparkSession

  val spark = SparkSession.builder().appName("CSVParquetSchemaMerge")
    .master("local")
    .getOrCreate()


  import spark.implicits._

  val csvDataday1 = spark.sparkContext.parallelize(
    """
      |empid,empname,salary
      |001,ABC,10000
    """.stripMargin.lines.toList).toDS()
  val csvDataday2 = spark.sparkContext.parallelize(
    """
      |empid,empage,empdept,empname,salary
      |001,30,XYZ,ABC,10000
    """.stripMargin.lines.toList).toDS()

  val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday1)

  println("first day data ")
  frame.show
  frame.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=1")
  frame.printSchema

  val frame1 = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday2)
  frame1.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=2")
  println("Second day data ")

  frame1.show(false)
  frame1.printSchema

  // Read the partitioned table
  val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
  println("Merged Schema")
  mergedDF.printSchema
  println("Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column")
  mergedDF.show(false)


}


Результат:

* 104 1 *

Дерево каталогов:

enter image description here

...