Ошибка Spark MapPartition NullPointerException - PullRequest
0 голосов
/ 27 мая 2019

Я запускаю простой проект Spark в кластере EMR YARN, чтобы:

  • прочитать текстовый файл на S3 в RDD[String]
  • , определить схему и преобразовать этот RDDв DF

Я выполняю mapPartition на RDD для преобразования этого RDD[String] в RDD[Row].Моя проблема - я получаю java.Lang.NullPointerException и не могу понять, в чем проблема.

В трассировке стека перечислены эти 2 номера строки в исходном коде -

  • строкаrdd1.mapPartition
  • в анонимной функции, строка с регистром совпадения, который соответствует обычному

Вот выдержка из трассировки стека -

Caused by: java.lang.NullPointerException
    at packageA.Herewego$$anonfun$3.apply(Herewego.scala:107)
    at packageA.Herewego$$anonfun$3.apply(Herewego.scala:88)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I 'мы пробовали -

  • Ошибка возникает при работе в режиме кластера YARN, а не в локальном режиме (в моей IDE).Это заставило меня думать, что что-то не определено в Executor?Я переместил функцию createrow def в анонимную функцию def - она ​​не сработала.

Вот кодовый блок

val rdd4: RDD[Row] = rdd1.mapPartitions((it:Iterator[String]) => {

    def createrow(a: List[String]): Row = {

      val format = new java.text.SimpleDateFormat("dd/MMM/yyyy HH:mm:ss Z")

      val re1: Row = Row.apply(a.head)

      val d: Date = format.parse(a.tail.mkString(" "))
      val t = new Timestamp(d.getTime)
      val re2: Row = Row.apply(t)

      Row.merge(re1, re2)
    }

    var output: List[Row] = List()
    while (it.hasNext) {
      val data: String = it.next()
      val res = data match {
        case rx(ipadd, date, time) => createrow(List(ipadd, date, time))
        case _ => createrow(List("0.0.0.0", "00/Jan/0000", "00:00:00 0"))
      }
      output = output :+ res
    }
    output.toIterator
  }).persist(MEMORY_ONLY)

// Collect and Persist the RDD in Memory
  val tmp = rdd4.collect()

Нужно ли передавать какие-либо переменныеили функции, используемые в mapPartition?Любые указатели в правильном направлении будут более чем оценены.

...