У меня есть таблица кустов, разделенная меткой времени поверх файлов паркета с мгновенным преобразованием.в основном пути выглядят так:
s3:/bucketname/project/flowtime=0/
s3:/bucketname/project/flowtime=1/
s3:/bucketname/project/flowtime=2/
...
Я обнаружил некоторую несогласованность, учитывая эту таблицу.Проблема состоит в том, что из-за одного поля, которое дает LongType в некоторых схемах паркета, а String - в другом, выполнение запросов вызывает исключение ClassCastException.
Итак, я сейчас пытаюсь прочитать все мои файлы паркета и проверить их схемы, чтобы я мог их воссоздать.Я хочу сопоставить мои имена файлов со схемой ассоциированного паркета.так что у меня может быть:
filename | schema
s3:/bucketname/project/flowtime |StructField(StructField(Id,StringType,True),
|StructField(Date,StringType,True)
Поэтому я попытался использовать spark с Scala и функцией input_file_name из org.apache.spark.sql.functions, которые я обертываю в UDF.Это работает довольно хорошо.
val filename = (path: String) => path
val filenameUDF = udf(filename)
val df=sqlContext.parquetFile("s3a://bucketname/").select(filenameUDF(input_file_name())).toDF()
df.map(lines =>(lines.toString,sqlContext.read.parquet(lines.toString.replace("[","").replace("]","")).schema.toString)})
Это значит дать СДР [(String, String)] Только кажется, что часть, которая читает паркет в моей карте, генерирует исключение nullPointerException.
ERROR scheduler.TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 35, CONFIDENTIAL-SERVER-NAME, executor 13): java.lang.NullPointerException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1888)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1888)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Если у вас есть идея, почему паркет чтения не работает внутри карты, пожалуйста, дайте мне знать, почему, потому что обе части пары, которую я хочу создать (имя файла И схема), похоже, работаютхорошо, но присоединение к ним не
Если у вас также есть лучшие идеи, как решить проблему несоответствия в моих файлах паркета, из-за которых мой стол-улей поврежден, потому что я не вижу другого выбора, кроме как работать таким образом, потому что паркет неизменен и меняет улейметаданные не изменяют встроенные метаданные паркета в каждом файле.
Спасибо за внимание.Renaud