Задание Spark прервано из-за сбоя этапа: java.lang.NullPointerException при загрузке файлов XML с использованием spark-xml - PullRequest
0 голосов
/ 06 января 2019

Заранее всем спасибо за помощь! Я очень новичок в Spark и использую Databricks (Runtime Version 5.0, включает Apache Spark 2.4.0, Scala 2.11).

Я пытался загрузить некоторые XML-файлы, используя spark-xml (2.11.0.5.0), и обнаружил следующее исключение sparkException:

org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 5 in stage 0.0 failed 4 times, most recent failure: Lost task 
5.3 in stage 0.0 (TID 17, 10.106.234.32, executor 0): 
java.lang.NullPointerException

at scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:37)
at com.databricks.spark.xml.XmlRecordReader.readUntilStartElement(XmlInputFormat.scala:156)
at com.databricks.spark.xml.XmlRecordReader.next(XmlInputFormat.scala:131)
at com.databricks.spark.xml.XmlRecordReader.nextKeyValue(XmlInputFormat.scala:119)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:236)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:124)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:459)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1401)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Странно, что не было проблем с загрузкой файлов за день до использования точно таких же конфигураций кластера:

  • Версия среды исполнения Databricks: 5.0 (включает Apache Spark 2.4.0, Scala 2.11)
  • Тип драйвера и рабочий: i3.xlarge - 30,5 ГБ памяти, 4 ядра, 1 DBU

Каждый из несжатых XML-файлов имеет размер от 70 до 130 МБ, и их 8. Я надеюсь загрузить их в один набор данных.

Я получил следующие данные и загрузил их в Databricks:

wget ftp://ftp.nlm.nih.gov/nlmdata/sample/medline/*.gz
gunzip *.gz

Вот код:

import com.databricks.spark.xml._
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{Text, LongWritable}


def loadMedline(spark: SparkSession, path: String) = {
  import spark.implicits._

  /* conf: org.apache.hadoop.conf.Configuration = Configuration: 
   * core-default.xml, core-site.xml, mapred-default.xml, 
   * mapred-site.xml, yarn-default.xml, yarn-site.xml, 
   * hdfs-default.xml, hdfs-site.xml
   */

  @transient val conf = new Configuration()
  conf.set(XmlInputFormat.START_TAG_KEY, "<MedlineCitation>")
  conf.set(XmlInputFormat.END_TAG_KEY, "</MedlineCitation>")

  /* sc: org.apache.spark.SparkContext */
  val sc = spark.sparkContext

  /* records: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = 
   * dbfs:/FileStore/tables/ch7_medline/ NewHadoopRDD[0] at newAPIHadoopFile at command-2335143412865645:1
   */

  val records = sc.newAPIHadoopFile(path, classOf[XmlInputFormat],
    classOf[LongWritable], classOf[Text])

  /* returns: org.apache.spark.sql.Dataset[String] = [value: string] */
  records.map(line => line._2.toString).toDS()
}

/* medlineRaw: org.apache.spark.sql.Dataset[String] = [value: string] */
val medlineRaw = loadMedline(spark, "dbfs:/FileStore/tables/ch7_medline/")

medlineRaw.count()   // <--- got the error here

Еще раз большое спасибо! Некоторое время оглядывался по сторонам, но все равно не мог заставить его работать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...