Как перезаписать файл паркета, откуда читается DataFrame в Spark - PullRequest
0 голосов
/ 19 сентября 2019

Это микрокосм проблемы, с которой я сталкиваюсь, где я получаю ошибку.Позвольте мне попытаться воспроизвести это здесь.

Я сохраняю DataFrame как parquet, но когда я перезагружаю файл DataFrame из parquet и сохраняю его снова как parquet, я получаю сообщение об ошибке.

valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

# Load it back
df = spark.read.format('parquet').load('.../temp')
# Save it back - This produces ERROR   
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

Сообщение об ошибке -

executor 22): java.io.FileNotFoundException: Запрошенный файл maprfs: ///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet не существует.Возможно, базовые файлы были обновлены.Вы можете явно аннулировать кэш в Spark, запустив команду «REFRESH TABLE tableName» в SQL или воссоздав соответствующий набор данных / DataFrame.

Другой вопрос SO решает эту проблему.Предложенное решение состояло в том, чтобы обновить таблицу, как показано ниже, но это не помогло.Проблема заключается в обновлении метаданных.Я не знаю, где я делаю ошибку.

df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

Обходной путь для этой проблемы: Не элегантный способ решить эту проблему - сохранить DataFrame как parquet файл с другим именем, затем удалите исходный файл parquet и, наконец, переименуйте этот parquet файл в старое имя.

# Workaround
import os
import shutil

# Load it back
df = spark.read.format('parquet').load('.../temp')

# Save it back as temp1, as opposed to original temp      
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')

# Delete the original parquet file
shutil.rmtree('.../temp')

# Renaming the parquet folder.
os.rename('.../temp1','.../temp')

Но проблема в том, что некоторые DataFrames довольно большие, и это может быть не лучшим способом справиться с этим.Не говоря уже о том, что переименование вызовет некоторые проблемы с метаданными, в которых я не уверен.

Ответы [ 2 ]

1 голос
/ 26 сентября 2019

Когда данные извлекаются из кэша, они, кажется, работают нормально.

val df = spark.read.format("parquet").load("temp").cache()

cache - это ленивая операция, которая не запускает никаких вычислений, мы должны добавить несколько фиктивных действий.

println(df.count()) //count over parquet files should be very fast  

Теперь должно работать:

df.repartition(1).write.mode(SaveMode.Overwrite).parquet("temp")
0 голосов
/ 19 сентября 2019

Попробуйте вот так

val df: DataFrame = Seq(("Male","2019-09-06"),("Female","2019-09-06"),("Male","2019-09-07")).toDF("sex", "date")

Запись

df.write.parquet("datanew1.parquet")

Чтение

val newDataDF = sqlContext.read.parquet("datanew1.parquet")

scala> df
res13: org.apache.spark.sql.DataFrame = [sex: string, date: string]

scala> newDataDF
res14: org.apache.spark.sql.DataFrame = [sex: string, date: string]
...