Я запускаю простой проект Spark в кластере EMR YARN, чтобы:
- прочитать текстовый файл на S3 в
RDD[String]
- , определить схему и преобразовать этот RDDв DF
Я выполняю mapPartition на RDD для преобразования этого RDD[String]
в RDD[Row]
.Моя проблема - я получаю java.Lang.NullPointerException
и не могу понять, в чем проблема.
В трассировке стека перечислены эти 2 номера строки в исходном коде -
- строка
rdd1.mapPartition
- в анонимной функции, строка с регистром совпадения, который соответствует обычному
Вот выдержка из трассировки стека -
Caused by: java.lang.NullPointerException
at packageA.Herewego$$anonfun$3.apply(Herewego.scala:107)
at packageA.Herewego$$anonfun$3.apply(Herewego.scala:88)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I 'мы пробовали -
- Ошибка возникает при работе в режиме кластера YARN, а не в локальном режиме (в моей IDE).Это заставило меня думать, что что-то не определено в Executor?Я переместил функцию
createrow
def в анонимную функцию def - она не сработала.
Вот кодовый блок
val rdd4: RDD[Row] = rdd1.mapPartitions((it:Iterator[String]) => {
def createrow(a: List[String]): Row = {
val format = new java.text.SimpleDateFormat("dd/MMM/yyyy HH:mm:ss Z")
val re1: Row = Row.apply(a.head)
val d: Date = format.parse(a.tail.mkString(" "))
val t = new Timestamp(d.getTime)
val re2: Row = Row.apply(t)
Row.merge(re1, re2)
}
var output: List[Row] = List()
while (it.hasNext) {
val data: String = it.next()
val res = data match {
case rx(ipadd, date, time) => createrow(List(ipadd, date, time))
case _ => createrow(List("0.0.0.0", "00/Jan/0000", "00:00:00 0"))
}
output = output :+ res
}
output.toIterator
}).persist(MEMORY_ONLY)
// Collect and Persist the RDD in Memory
val tmp = rdd4.collect()
Нужно ли передавать какие-либо переменныеили функции, используемые в mapPartition?Любые указатели в правильном направлении будут более чем оценены.