Q: 1. Поддерживает ли это только формат файла Parquet или любые другие форматы файлов, такие как CSV, TXT-файлы. 2. Если порядок столбцов нарушается, будет ли Mergeschema выравнивать столбцы в правильном порядке, когда был создан или мы должны сделать это вручную, выбрав все столбцы
AFAIK Схема слияния поддерживается только паркетом, а не другим форматом, таким как csv, txt.
Mergeschema (spark.sql.parquet.mergeSchema
) выровняет столбцы в правильном порядке, даже если они распределены.
Пример из документации искры по объединение схемы паркета :
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
ОБНОВЛЕНИЕ: Реальный пример, данный вами в поле для комментариев ...
В: Будут ли добавлены новые столбцы - empage, empdept
после empid,empname,salary columns
?
Ответ: Да EMPAGE, EMPDEPT были добавлены после EMPID, EMPNAME, SALARY с указанием столбца вашего дня.
см. Полный пример.
package examples
import org.apache.log4j.Level
import org.apache.spark.sql.SaveMode
object CSVDataSourceParquetSchemaMerge extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("CSVParquetSchemaMerge")
.master("local")
.getOrCreate()
import spark.implicits._
val csvDataday1 = spark.sparkContext.parallelize(
"""
|empid,empname,salary
|001,ABC,10000
""".stripMargin.lines.toList).toDS()
val csvDataday2 = spark.sparkContext.parallelize(
"""
|empid,empage,empdept,empname,salary
|001,30,XYZ,ABC,10000
""".stripMargin.lines.toList).toDS()
val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday1)
println("first day data ")
frame.show
frame.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=1")
frame.printSchema
val frame1 = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday2)
frame1.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=2")
println("Second day data ")
frame1.show(false)
frame1.printSchema
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
println("Merged Schema")
mergedDF.printSchema
println("Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column")
mergedDF.show(false)
}
Результат:
* 104 1 *
Дерево каталогов: