Эта ошибка возникает, когда вы пытаетесь прочитать файл, который не существует.
Поправьте меня, если я ошибаюсь, но я подозреваю, что вы перезаписываете все файлы при сохранении нового фрейма данных (используя .mode("overwrite")
). Пока этот процесс запущен, вы пытаетесь прочитать файл, который был удален, и это исключение выдается - это делает таблицу недоступной в течение определенного периода времени (во время обновления).
Насколько я знаю, естьнет прямого способа «обновить метаданные», как вы хотите.
Два (из нескольких возможных) способов решения этой проблемы:
1 - использовать режим добавления
Если вы простохотите добавить новый фрейм данных к старому, нет необходимости создавать временную папку и перезаписывать старый. Вы можете просто изменить режим сохранения с перезаписи на добавление. Таким образом, вы можете добавлять разделы в существующий файл Parquet без необходимости перезаписывать существующие.
df.write
.mode("append")
.parquet("/temp_table")
Это, безусловно, самое простое решение, и нет необходимости читать данные, которые уже были сохранены. Это, однако, не сработает, если вам нужно обновить старые данные (например, если вы делаете upsert). Для этого у вас есть опция 2:
2 - Использование представления Hive
Вы можете создать таблицы кустов и использовать представление для указания на самую последнюю (и доступную) таблицу.
Вот пример логики такого подхода:
Часть 1
- Если представление
<table_name>
не существует, мысоздайте новую таблицу с именем <table_name>_alpha0
для хранения новых данных - После создания таблицы мы создадим представление
<table_name>
как select * from
<table_name>_alpha0
Часть 2
Если представление <table_name>
существует, нам нужно увидеть, на какую таблицу оно указывает (<table_name>_alphaN)
Вы выполняете все операцииВы хотите с новыми данными сохранить их как таблицу с именем <table_name>_alpha(N+1)
После создания таблицы мы изменим представление <table_name>
на select * from <table_name>_alpha(N+1)
И пример кода:
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._
//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')
def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {
val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
.filter("col_name == 'View Text'")
.rdd
if(rdd_desc.isEmpty()) {
None
}
else {
Option(
rdd_desc.first()
.get(1)
.toString
.toLowerCase
.stripPrefix("select * from ")
)
}
}
else
None
}
//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.
def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")
new_df.write
.mode("overwrite")
.format("parquet")
.option("compression","snappy")
.saveAsTable(nextAlphaTable)
spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}
//An example on how to use this:
//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")
saveDataframe(spark, dbName, tableName, df)
println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show
val processed_df = df.unionByName(new_data) //Or other operations you want to do
println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)
println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show
Результат:
Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id| text|
+---+---------+
| 3| a|
| 4|dataframe|
| 1| I|
| 2| am|
+---+---------+
Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id| text|
+---+---------+
| 3| a|
| 4|dataframe|
| 5| with|
| 6| new|
| 7| data|
| 1| I|
| 2| am|
+---+---------+
Таким образом вы можете гарантировать, что версия представления <table_name>
всегда будет доступна. Это также имеет преимущество (или нет, в зависимости от вашего случая) от поддержки предыдущих версий таблицы. то есть предыдущая версия <table_name_alpha1>
будет <table_name_alpha0>
3 - бонус
Если вы можете обновить версию Spark, взгляните на Delta Lake (минимальная версия Spark: 2.4.2)
Надеюсь, это поможет:)