Почему этот List [String] к кадру данных вызывает исключение NullPointerException в Spark Scala? - PullRequest
0 голосов
/ 30 ноября 2018

Следующая codenippet вызывает исключение NullPointerException.Я не уверен, происходит ли это исключение в некоторых строках или всегда, поскольку кадр данных огромен и не может точно указать строку.

def removeUnwantedLetters(str: String): String = {
    str.split("\\W+").filter(word => (word.matches("[a-z]+") && (word.length > 1))).mkString(" ")
}

val myudf = spark.udf.register("learningUDF", (f1: String, f2: String) => {
    if(f1 != null && f2 != null) {
        val preproList = List(removeUnwantedLetters(f2.toLowerCase));

        if(preproList > 0) {
            val key_items = preproList.toDF("Description")
        }
    }

    (1, 1)
})



mydataframe.withColumn("pv", myudf($"f1", $"f2")).show

Весь код огромен, поэтому извините, что не вставляетеВесь код здесь, старался изо всех сил, чтобы минимизировать сбой кода здесь.Ниже приводится исключение, которое я получаю по фактическому коду:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 274.0 failed 4 times, most recent failure: Lost task 0.3 in stage 274.0 (TID 23387, 10.62.145.186, executor 2): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string, string, string, string, string, string, string, string, string, string, string, string) => struct<_1:int,_2:double>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_26$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:254)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at $anonfun$1.apply(<console>:100)
    at $anonfun$1.apply(<console>:82)
    ... 22 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
  ... 66 elided
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string, string, string, string, string, string, string, string, string, string, string, string) => struct<_1:int,_2:double>)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_26$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:254)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  ... 3 more
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:100)
  at $anonfun$1.apply(<console>:82)
  ... 22 more

Методом проб и ошибок я обнаружил, что эта строка val key_items = preproList.toDF("Description") вызывает NPE.Потому что, если я просто изменю его на val key_items = preproList, он будет работать нормально.

Can anyone please let me know when would `val key_items = preproList.toDF("Description")` give a `NullPointerException`.

Обновление

Кажется, что мы не можем создать информационный кадр внутри UDF.Потому что я пытался заменить val key_items = preproList.toDF("Description") на val key_items = List(1,2,3,4).toDF("VL").И, к моему удивлению, произошел сбой за тем же исключением.

Разве невозможно создать временный фрейм данных внутри UDF?

Обновление 2

Я пытаюсь создатьвременный фрейм данных для использования JohnSnowLabs Norvig Модель коррекции орфографии с использованием конвейера следующим образом:

val nlpPipeline = new Pipeline().setStages(Array(
  new DocumentAssembler().setInputCol("Description").setOutputCol("document"),
  new Tokenizer().setInputCols("document").setOutputCol("tokens"),
  norvigspell.setInputCols("tokens").setOutputCol("Description_corrected"),
  new Finisher().setInputCols("Description_corrected")
))

val dbDF = preproList.toDF("Description")

val spellcorrectedDF = dbDF.transform(dbDF=> nlpPipeline.fit(dbDF).transform(dbDF))

1 Ответ

0 голосов
/ 30 ноября 2018

Ответ сортировки: Нет, вы не можете создать DataFrame (или Dataset) внутри UDF .UDF работают с отдельными значениями строк и поэтому должны возвращать простые значения, которые могут быть сохранены в новом столбце, представьте их как Вычисляемые столбцы . Если бы вы могли создать DataFrame внутри UDF, у него будет только одна строка, и вы будете создавать множество из них, по одному на строку родительского элемента DataFrame.

Теперь по вашему коду сложно сказать, что вы хотите сделать, и я вижу, что вы пытаетесь очистить какой-то символ, сохраняете его в значении key_items (как DataFrame) и никогда не используете его ...чтобы в итоге вернуть пару (1, 1) констант независимо от предыдущих вычислений ... Тот факт, что ваш UDF принимает 2 параметра, а вы используете только один , меня тоже озадачивает.

Iбудет догадываться, что вы хотите вычислить описание на основе значения одного данного столбца (вы используете только один), поэтому что-то вроде следующего даст вам нечто подобное:

def removeUnwantedLetters(str: String): String = {
    str.split("\\W+").filter(word => (word.matches("[a-z]+") && (word.length > 1))).mkString(" ")
}

val myudf = spark.udf.register("learningUDF", (f1: String) => {
    if(f1 != null) {
        removeUnwantedLetters(f2.toLowerCase)
    } else ""
})

// This seems to be the DataFrame you are looking for
val descriptionDF = mydataframe
  .withColumn("Description", myudf($"f2"))
  .select("Description")

С предыдущим Spark может создать столбец Description из вызова вашего UDF для всех значений DataFrame.Затем, используя .select("Description"), вы создаете новый DataFrame, который имеет только столбец Description.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...