Искра DataFrame, за исключением исключения функции - PullRequest
0 голосов
/ 28 марта 2019

Я пытаюсь реализовать функцию удаления для паркета при использовании, кроме функции, я получаю исключение

org.apache.spark.sql.AnalysisException: разрешенные атрибуты (все атрибуты фрейма данных) отсутствуют во всех атрибутах фрейма данных. Проверьте, используются ли нужные атрибуты. Кроме {весь план sql}

Код не работает

//reading existing parquet data
val dfi=paths.collect().par.map(x=> {
  val df=spark.read.option("basePath", pathPrefix+finalFile).parquet(pathPrefix+finalFile+x)
  df.select(df.columns.sorted.map(col):_*)
}).reduce((x,y)=> x.union(y))

//using the keys to find which data is to be deleted
var deleteRecords=parquetDeleteDF.select(keys:_*).dropDuplicates().join(dfi,keys.map(x=>x.toString()))

//deleting data from existing data
val dfl=dfi.except(deleteRecords.select(deleteRecords.columns.sorted.map(col):_*))

Чудесно тот же код работает нормально, если я воссоздаю фрейм данных deleteRecords

Рабочий код

//reading existing parquet data
val dfi=paths.collect().par.map(x=> {
  val df=spark.read.option("basePath", pathPrefix+finalFile).parquet(pathPrefix+finalFile+x)
  df.select(df.columns.sorted.map(col):_*)
}).reduce((x,y)=> x.union(y))

//using the keys to find which data is to be deleted
var deleteRecords=parquetDeleteDF.select(keys:_*).dropDuplicates().join(dfi,keys.map(x=>x.toString()))

// recreating deleteRecords dataframe
deleteRecords=spark.createDataFrame(deleteRecords.rdd, deleteRecords.schema)

//deleting data from existing data
val dfl=dfi.except(deleteRecords.select(deleteRecords.columns.sorted.map(col):_*))

Я не могу понять, почему он выдает ошибку и как он работает при воссоздании фрейма данных

Примечание: имена ключей, которые используются для объединения, одинаковы в обоих фреймах данных, и после объединения они дедуплицируются. Также в ожидаемой функции оба dataframe имеют одинаковые имена атрибутов и тип данных.

Информация о версии: Spark 2.3.2 Скала 2.11.11

...