Scala и Spark - логистическая регрессия - NullPointerException - PullRequest
0 голосов
/ 18 декабря 2018

Привет всем!

У меня проблема с моим кодом scala и spark.Я пытаюсь реализовать модель логистической регрессии.Для этого мне пришлось реализовать две функции UDF для сбора своих функций.Проблема в том, что каждый раз, когда я пытаюсь вызвать функцию dataframe.show (), я получаю сообщение об ошибке:

NullPointerException

Я думал, что, возможно, у меня былонулевые значения на моем dataframe, и я попытался вызвать dataframe.na.drop () для устранения вероятных нулевых значений.

Проблема существует и говорит, что не смогла выполнить пользовательскую функцию (anonfun $ 3: (массив, массив) => int).

Вот мой дырочный код:

val sc = spark.sparkContext

val data = sc.textFile("resources/data/training_set.txt").map(line =>{
  val fields = line.split(" ")
  (fields(0),fields(1), fields(2).toInt)
})
import spark.implicits._
val trainingDF = data.toDF("srcId","dstId", "label")
val infoRDD = spark.read.option("header","false").option("inferSchema","true").format("csv").load("resources/data/node_information.csv")

val infoDF = infoRDD.toDF("srcId","year","title","authors","jurnal","abstract")

println("Showing linksDF sample...")
trainingDF.show(5)
println("Rows of linksDF: ",trainingDF.count())

println("Showing infoDF sample...")
infoDF.show(2)
println("Rows of infoDF: ",infoDF.count())

println("Joining linksDF and infoDF...")
var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" === $"b.srcId")

println(joinedDF.count())

joinedDF = joinedDF.select($"a.srcId",$"a.dstId",$"a.label",$"b.year",$"b.title",$"b.authors",$"b.jurnal",$"b.abstract")

println("Renameming joinedDF...")
joinedDF = joinedDF
  .withColumnRenamed("srcId","id_from")
  .withColumnRenamed("dstId","id_to")
  .withColumnRenamed("year","year_from")
  .withColumnRenamed("title","title_from")
  .withColumnRenamed("authors","authors_from")
  .withColumnRenamed("jurnal","jurnal_from")
  .withColumnRenamed("abstract","abstract_from")

var infoDfRenamed = joinedDF
  .withColumnRenamed("id_from","id_from")
  .withColumnRenamed("id_to","id_to")
  .withColumnRenamed("year_from","year_to")
  .withColumnRenamed("title_from","title_to")
  .withColumnRenamed("authors_from","authors_to")
  .withColumnRenamed("jurnal_from","jurnal_to")
  .withColumnRenamed("abstract_from","abstract_to").select("id_to","year_to","title_to","authors_to","jurnal_to","jurnal_to")

var finalDF = joinedDF.as(("a")).join(infoDF.as("b"),$"a.id_to" === $"b.srcId")

finalDF = finalDF
  .withColumnRenamed("year","year_to")
  .withColumnRenamed("title","title_to")
  .withColumnRenamed("authors","authors_to")
  .withColumnRenamed("jurnal","jurnal_to")
  .withColumnRenamed("abstract","abstract_to")

println("Dropping unused columns from joinedDF...")
finalDF = finalDF.drop("srcId")


println("Spliting title_from column into words...")
finalDF = finalDF.withColumn("title_from_words", functions.split(col("title_from"), "\\s+"))
println("Spliting title_to column into words...")
finalDF = finalDF.withColumn("title_to_words", functions.split(col("title_to"), "\\s+"))

println("Spliting authors_from column into words...")
finalDF = finalDF.withColumn("authors_from_words", functions.split(col("authors_from"), "\\s+"))
println("Spliting authors_to column into words...")
finalDF = finalDF.withColumn("authors_to_words", functions.split(col("authors_to"), "\\s+"))

println("Removing stopwords from title_from column...")
val remover = new StopWordsRemover().setInputCol("title_from_words").setOutputCol("title_from_words_f")
finalDF = remover.transform(finalDF)

println("Removing stopwords from title_to column...")
val remover2 = new StopWordsRemover().setInputCol("title_to_words").setOutputCol("title_to_words_f")
finalDF = remover2.transform(finalDF)

println("Removing stopwords from authors_from column...")
val remover3 = new StopWordsRemover().setInputCol("authors_from_words").setOutputCol("authors_from_words_f")
finalDF = remover3.transform(finalDF)

println("Removing stopwords from authors_to column...")
val remover4 = new StopWordsRemover().setInputCol("authors_to_words").setOutputCol("authors_to_words_f")
finalDF = remover4.transform(finalDF)

finalDF.count() 

val udf_title_overlap=udf(findNumberCommonWordsTitle(_:Seq[String],_:Seq[String]))
val udf_authors_overlap = udf(findNumberCommonAuthors(_:Seq[String], _:Seq[String]))

println("Getting the number of common words between title_from and title_to columns using UDF function...")
finalDF = finalDF.withColumn("titles_intersection",udf_title_overlap(finalDF("title_from_words"),finalDF("title_to_words")))

println("Getting the number of common words between authors_from and authors_to columns using UDF function...")
finalDF = finalDF.withColumn("authors_intersection",udf_authors_overlap(finalDF("aut 
hors_from_words"),finalDF("authors_to_words")))

finalDF.count()

finalDF = finalDF.withColumn("time_dist",abs($"year_from" - 
$"year_to"))

println("Show schema of finalDF:\n")
finalDF.printSchema()

println("Dropping unused columns from finalDF...\n")
val finalCollsDF = finalDF.select("label","titles_intersection", 
"authors_intersection", "time_dist")

println("Printing schema for finalDF...\n")
finalCollsDF.printSchema()

println("Creating features coll from finalDF using 
VectorAssembler...\n")
val assembler = new VectorAssembler()
  .setInputCols(Array("titles_intersection", "authors_intersection", 
"time_dist"))
  .setOutputCol("features")

val output = assembler.transform(finalCollsDF)

println("Printing final schema before trainning...\n")
output.printSchema()
output.na.drop()

println("Splitting dataset into trainingData and testData...\n")
val Array(trainingData, testData) = output.randomSplit(Array(0.6, 
0.4))

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setRawPredictionCol("prediction_raw")
  .setMaxIter(10)

val lr_model = lr.fit(trainingData)

val lr_results = lr_model.transform(testData)

val evaluator = new BinaryClassificationEvaluator()
  .setRawPredictionCol("prediction_raw")
  .setLabelCol("label")
  .setMetricName("areaUnderPR")

println("RESULTS FOR LOGISTIC REGRESSION:")
println(evaluator.evaluate(lr_results))

}

Две функции UDF, которые я использую, следующие:

 def findNumberCommonWordsTitle(title_from:Seq[String], 
   title_to:Seq[String]) ={
   val intersection = title_from.intersect(title_to)
   intersection.length
    }

  def findNumberCommonAuthors(author_from:Seq[String], 
   author_to:Seq[String])={
  val intersection = author_from.intersect(author_to)
   intersection.length
  }

Я искал нулевые значения вручную, используя оператор foreach, но он тоже не работал.Как я могу решить эту проблему.Может быть, у меня есть другая проблема, которую я не могу найти.

Спасибо

1 Ответ

0 голосов
/ 19 декабря 2018

Spark позволяет значениям быть нулевыми в фреймах данных, поэтому вызов UDF для столбцов этого фрейма может привести к NullPointerExceptions, если, например, один или оба массива имеют значение null.

.UDF для null значений и обработайте их соответствующим образом, либо убедитесь, что столбец в кадре данных не может быть нулевым, и установите пустые значения в пустые массивы.

 def findNumberCommonAuthors(author_from:Seq[String], author_to:Seq[String])={
   if(author_from == null || author_to == null) 0
   else author_from.intersect(author_to).length
  }
...