Обновление метаданных для Dataframe при чтении файла паркета - PullRequest
1 голос
/ 08 ноября 2019

Я пытаюсь прочитать файл паркета как информационный фрейм, который будет периодически обновляться (путь равен /folder_name. При появлении новых данных старый путь к файлу паркета (/folder_name) будет переименовываться во временный путь, а затеммы объединяем как новые, так и старые данные и будем хранить по старому пути (/folder_name)

. Что произойдет, если у нас есть файл паркета как hdfs://folder_name/part-xxxx-xxx.snappy.parquet перед обновлением, а затем после обновления он изменяется на hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet

Проблема возникает, когда я пытаюсь прочитать файл паркета во время обновления

sparksession.read.parquet ("filename") => он принимает старый путьhdfs://folder_name/part-xxxx-xxx.snappy.parquet (путь существует)

когда вызывается действие на фрейме данных, оно пытается прочитать данные из hdfs://folder_name/part-xxxx-xxx.snappy.parquet, но из-за обновления имя файла изменилось, и я получаю следующую проблему

java.io.FileNotFoundException: файл не существует: hdfs://folder_name/part-xxxx-xxx.snappy.parquet Возможно, базовые файлы были обновлены. Вы можете явно аннулировать кэш в Spark, запустив 'REFRESH TABLE tableName'команда SQL или воссоздание соответствующего набора данных / DataFrame.

Я использую Spark 2.2

Кто-нибудь может мне помочь, как обновить метаданные?

Ответы [ 2 ]

0 голосов
/ 15 ноября 2019

Эта ошибка возникает, когда вы пытаетесь прочитать файл, который не существует.

Поправьте меня, если я ошибаюсь, но я подозреваю, что вы перезаписываете все файлы при сохранении нового фрейма данных (используя .mode("overwrite")). Пока этот процесс запущен, вы пытаетесь прочитать файл, который был удален, и это исключение выдается - это делает таблицу недоступной в течение определенного периода времени (во время обновления).

Насколько я знаю, естьнет прямого способа «обновить метаданные», как вы хотите.

Два (из нескольких возможных) способов решения этой проблемы:

1 - использовать режим добавления

Если вы простохотите добавить новый фрейм данных к старому, нет необходимости создавать временную папку и перезаписывать старый. Вы можете просто изменить режим сохранения с перезаписи на добавление. Таким образом, вы можете добавлять разделы в существующий файл Parquet без необходимости перезаписывать существующие.

df.write
  .mode("append")
  .parquet("/temp_table")

Это, безусловно, самое простое решение, и нет необходимости читать данные, которые уже были сохранены. Это, однако, не сработает, если вам нужно обновить старые данные (например, если вы делаете upsert). Для этого у вас есть опция 2:

2 - Использование представления Hive

Вы можете создать таблицы кустов и использовать представление для указания на самую последнюю (и доступную) таблицу.

Вот пример логики такого подхода:

Часть 1

  • Если представление <table_name> не существует, мысоздайте новую таблицу с именем <table_name>_alpha0 для хранения новых данных
  • После создания таблицы мы создадим представление <table_name> как select * from <table_name>_alpha0

Часть 2

  • Если представление <table_name> существует, нам нужно увидеть, на какую таблицу оно указывает (<table_name>_alphaN)

  • Вы выполняете все операцииВы хотите с новыми данными сохранить их как таблицу с именем <table_name>_alpha(N+1)

  • После создания таблицы мы изменим представление <table_name> на select * from <table_name>_alpha(N+1)

И пример кода:

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._


//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')

def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
  if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {

    val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
      .filter("col_name == 'View Text'")
      .rdd

    if(rdd_desc.isEmpty()) {
      None
    }
    else {
      Option(
        rdd_desc.first()
          .get(1)
          .toString
          .toLowerCase
          .stripPrefix("select * from ")
      )
    }
  }
  else
    None
}

//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.

def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
  val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
  val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")

  new_df.write
    .mode("overwrite")
    .format("parquet")
    .option("compression","snappy")
    .saveAsTable(nextAlphaTable)

  spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}

//An example on how to use this:

//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"

println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")

saveDataframe(spark, dbName, tableName, df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

val processed_df = df.unionByName(new_data) //Or other operations you want to do

println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

Результат:

Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  1|        I|
|  2|       am|
+---+---------+

Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  5|     with|
|  6|      new|
|  7|     data|
|  1|        I|
|  2|       am|
+---+---------+

Таким образом вы можете гарантировать, что версия представления <table_name> всегда будет доступна. Это также имеет преимущество (или нет, в зависимости от вашего случая) от поддержки предыдущих версий таблицы. то есть предыдущая версия <table_name_alpha1> будет <table_name_alpha0>

3 - бонус

Если вы можете обновить версию Spark, взгляните на Delta Lake (минимальная версия Spark: 2.4.2)

Надеюсь, это поможет:)

0 голосов
/ 15 ноября 2019
  1. Простым решением было бы сначала использовать df.cache.count, затем объединить его с новыми данными и записать в /folder_name в режиме overwrite. Вам не нужно будет использовать temp путь в этом случае.

  2. Вы упоминали, что переименовываете /folder_name в некоторый временный путь. Поэтому вы должны читать старые данные с этого временного пути, а не hdfs://folder_name/part-xxxx-xxx.snappy.parquet.

...