NullPointerException при использовании Word2VecModel с UserDefinedFunction - PullRequest
0 голосов
/ 27 апреля 2018

Я пытаюсь передать объект модели word2vec в мой иск udf. В основном у меня есть набор тестов с идентификаторами фильмов, и я хочу передать идентификаторы вместе с объектом модели, чтобы получить массив рекомендуемых фильмов для каждой строки.

def udfGetSynonyms(model: org.apache.spark.ml.feature.Word2VecModel) = 
     udf((col : String)  => {
          model.findSynonymsArray("20", 1)
})

однако это дает мне исключение нулевого указателя. Когда я запускаю model.findSynonymsArray ("20", 1) за пределами udf, я получаю ожидаемый ответ. По какой-то причине он не понимает что-то о функции внутри udf, но может запустить ее за пределами udf.

Примечание: я добавил здесь «20», чтобы получить фиксированный ответ, чтобы посмотреть, сработает ли это. То же самое происходит, когда я заменяю «20» на «col.

».

Спасибо за помощь!

StackTrace:

SparkException: Job aborted due to stage failure: Task 0 in stage 23127.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23127.0 (TID 4646648, 10.56.243.178, executor 149): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$udfGetSynonyms1$1: (string) => array<struct<_1:string,_2:double>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:49)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:126)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:111)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:350)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.Word2VecModel.findSynonymsArray(Word2Vec.scala:273)
at linebb57ebe901e04c40a4fba9fb7416f724554.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$udfGetSynonyms1$1.apply(command-232354:7)
at linebb57ebe901e04c40a4fba9fb7416f724554.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$udfGetSynonyms1$1.apply(command-232354:4)
... 12 more

Ответы [ 2 ]

0 голосов
/ 30 июня 2019

Я думаю, что эта проблема возникает, потому что wordVectors является временной переменной

class Word2VecModel private[ml] (
    @Since("1.4.0") override val uid: String,
    @transient private val wordVectors: feature.Word2VecModel)
  extends Model[Word2VecModel] with Word2VecBase with MLWritable {

Я решил эту проблему, передав w2vModel.getVectors и заново создав модель Word2VecModel внутри каждого раздела

0 голосов
/ 27 апреля 2018

API SQL и udf немного ограничены, и я не уверен, есть ли способ использовать пользовательские типы в качестве столбцов или в качестве входных данных для udfs. Немного погуглив, ничего полезного не получилось.

Вместо этого вы можете использовать DataSet или RDD API и просто использовать обычную функцию Scala вместо udf, что-то вроде:

val model: Word2VecModel = ...
val inputs: DataSet[String] = ...
inputs.map(movieId => model.findSynonymsArray(movieId, 10))

В качестве альтернативы, я полагаю, вы могли бы сериализовать модель в строку и из строки, но это выглядит намного уродливее.

...