Схема слияния с int и double не может быть решена при чтении файла паркета - PullRequest
0 голосов
/ 18 декабря 2018

У меня есть два файла паркета, один содержит целое поле myField, а другой содержит двойное поле myField.При попытке прочитать оба файла одновременно

val basePath = "/path/to/file/"
val fileWithInt = basePath + "intFile.snappy.parquet"
val fileWithDouble = basePath + "doubleFile.snappy.parquet"
val result = spark.sqlContext.read.option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")

я получаю следующую ошибку

Caused by: org.apache.spark.SparkException: Failed to merge fields 'myField' and 'myField'. Failed to merge incompatible data types IntegerType and DoubleType

При передаче явной схемы

val schema = StructType(Seq(new StructField("myField", IntegerType)))
val result = spark.sqlContext.read.schema(schema).option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")

Сбой приследующее

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
    at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)

При приведении к двойному

val schema = StructType(Seq(new StructField("myField", DoubleType)))

я получаю

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
    at org.apache.parquet.column.Dictionary.decodeToDouble(Dictionary.java:60)

Кто-нибудь знает какие-либо способы решения этой проблемы, кроме повторной обработки исходных данных.

1 Ответ

0 голосов
/ 26 декабря 2018

В зависимости от количества файлов, которые вы собираетесь прочитать, вы можете использовать один из этих двух подходов:

Это будет лучше для меньшего количества файлов паркета

def merge(spark: SparkSession, paths: Seq[String]): DataFrame = {
    import spark.implicits._

    paths.par.map {
      path =>
        spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType))
    }.reduce(_.union(_))
  }

ЭтоПодход будет лучше обрабатывать большое количество файлов, так как он будет сохранять короткие линии

def merge2(spark: SparkSession, paths: Seq[String]): DataFrame = {
    import spark.implicits._

    spark.sparkContext.union(paths.par.map {
      path =>
        spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType)).as[Double].rdd
    }.toList).toDF
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...