Я пытаюсь реализовать функцию удаления для паркета при использовании, кроме функции, я получаю исключение
org.apache.spark.sql.AnalysisException: разрешенные атрибуты (все атрибуты фрейма данных) отсутствуют во всех атрибутах фрейма данных. Проверьте, используются ли нужные атрибуты.
Кроме
{весь план sql}
Код не работает
//reading existing parquet data
val dfi=paths.collect().par.map(x=> {
val df=spark.read.option("basePath", pathPrefix+finalFile).parquet(pathPrefix+finalFile+x)
df.select(df.columns.sorted.map(col):_*)
}).reduce((x,y)=> x.union(y))
//using the keys to find which data is to be deleted
var deleteRecords=parquetDeleteDF.select(keys:_*).dropDuplicates().join(dfi,keys.map(x=>x.toString()))
//deleting data from existing data
val dfl=dfi.except(deleteRecords.select(deleteRecords.columns.sorted.map(col):_*))
Чудесно тот же код работает нормально, если я воссоздаю фрейм данных deleteRecords
Рабочий код
//reading existing parquet data
val dfi=paths.collect().par.map(x=> {
val df=spark.read.option("basePath", pathPrefix+finalFile).parquet(pathPrefix+finalFile+x)
df.select(df.columns.sorted.map(col):_*)
}).reduce((x,y)=> x.union(y))
//using the keys to find which data is to be deleted
var deleteRecords=parquetDeleteDF.select(keys:_*).dropDuplicates().join(dfi,keys.map(x=>x.toString()))
// recreating deleteRecords dataframe
deleteRecords=spark.createDataFrame(deleteRecords.rdd, deleteRecords.schema)
//deleting data from existing data
val dfl=dfi.except(deleteRecords.select(deleteRecords.columns.sorted.map(col):_*))
Я не могу понять, почему он выдает ошибку и как он работает при воссоздании фрейма данных
Примечание: имена ключей, которые используются для объединения, одинаковы в обоих фреймах данных, и после объединения они дедуплицируются. Также в ожидаемой функции оба dataframe имеют одинаковые имена атрибутов и тип данных.
Информация о версии:
Spark 2.3.2
Скала 2.11.11