Привет всем!
У меня проблема с моим кодом scala и spark.Я пытаюсь реализовать модель логистической регрессии.Для этого мне пришлось реализовать две функции UDF для сбора своих функций.Проблема в том, что каждый раз, когда я пытаюсь вызвать функцию dataframe.show (), я получаю сообщение об ошибке:
Я думал, что, возможно, у меня былонулевые значения на моем dataframe, и я попытался вызвать dataframe.na.drop () для устранения вероятных нулевых значений.
Проблема существует и говорит, что не смогла выполнить пользовательскую функцию (anonfun $ 3: (массив, массив) => int).
Вот мой дырочный код:
val sc = spark.sparkContext
val data = sc.textFile("resources/data/training_set.txt").map(line =>{
val fields = line.split(" ")
(fields(0),fields(1), fields(2).toInt)
})
import spark.implicits._
val trainingDF = data.toDF("srcId","dstId", "label")
val infoRDD = spark.read.option("header","false").option("inferSchema","true").format("csv").load("resources/data/node_information.csv")
val infoDF = infoRDD.toDF("srcId","year","title","authors","jurnal","abstract")
println("Showing linksDF sample...")
trainingDF.show(5)
println("Rows of linksDF: ",trainingDF.count())
println("Showing infoDF sample...")
infoDF.show(2)
println("Rows of infoDF: ",infoDF.count())
println("Joining linksDF and infoDF...")
var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" === $"b.srcId")
println(joinedDF.count())
joinedDF = joinedDF.select($"a.srcId",$"a.dstId",$"a.label",$"b.year",$"b.title",$"b.authors",$"b.jurnal",$"b.abstract")
println("Renameming joinedDF...")
joinedDF = joinedDF
.withColumnRenamed("srcId","id_from")
.withColumnRenamed("dstId","id_to")
.withColumnRenamed("year","year_from")
.withColumnRenamed("title","title_from")
.withColumnRenamed("authors","authors_from")
.withColumnRenamed("jurnal","jurnal_from")
.withColumnRenamed("abstract","abstract_from")
var infoDfRenamed = joinedDF
.withColumnRenamed("id_from","id_from")
.withColumnRenamed("id_to","id_to")
.withColumnRenamed("year_from","year_to")
.withColumnRenamed("title_from","title_to")
.withColumnRenamed("authors_from","authors_to")
.withColumnRenamed("jurnal_from","jurnal_to")
.withColumnRenamed("abstract_from","abstract_to").select("id_to","year_to","title_to","authors_to","jurnal_to","jurnal_to")
var finalDF = joinedDF.as(("a")).join(infoDF.as("b"),$"a.id_to" === $"b.srcId")
finalDF = finalDF
.withColumnRenamed("year","year_to")
.withColumnRenamed("title","title_to")
.withColumnRenamed("authors","authors_to")
.withColumnRenamed("jurnal","jurnal_to")
.withColumnRenamed("abstract","abstract_to")
println("Dropping unused columns from joinedDF...")
finalDF = finalDF.drop("srcId")
println("Spliting title_from column into words...")
finalDF = finalDF.withColumn("title_from_words", functions.split(col("title_from"), "\\s+"))
println("Spliting title_to column into words...")
finalDF = finalDF.withColumn("title_to_words", functions.split(col("title_to"), "\\s+"))
println("Spliting authors_from column into words...")
finalDF = finalDF.withColumn("authors_from_words", functions.split(col("authors_from"), "\\s+"))
println("Spliting authors_to column into words...")
finalDF = finalDF.withColumn("authors_to_words", functions.split(col("authors_to"), "\\s+"))
println("Removing stopwords from title_from column...")
val remover = new StopWordsRemover().setInputCol("title_from_words").setOutputCol("title_from_words_f")
finalDF = remover.transform(finalDF)
println("Removing stopwords from title_to column...")
val remover2 = new StopWordsRemover().setInputCol("title_to_words").setOutputCol("title_to_words_f")
finalDF = remover2.transform(finalDF)
println("Removing stopwords from authors_from column...")
val remover3 = new StopWordsRemover().setInputCol("authors_from_words").setOutputCol("authors_from_words_f")
finalDF = remover3.transform(finalDF)
println("Removing stopwords from authors_to column...")
val remover4 = new StopWordsRemover().setInputCol("authors_to_words").setOutputCol("authors_to_words_f")
finalDF = remover4.transform(finalDF)
finalDF.count()
val udf_title_overlap=udf(findNumberCommonWordsTitle(_:Seq[String],_:Seq[String]))
val udf_authors_overlap = udf(findNumberCommonAuthors(_:Seq[String], _:Seq[String]))
println("Getting the number of common words between title_from and title_to columns using UDF function...")
finalDF = finalDF.withColumn("titles_intersection",udf_title_overlap(finalDF("title_from_words"),finalDF("title_to_words")))
println("Getting the number of common words between authors_from and authors_to columns using UDF function...")
finalDF = finalDF.withColumn("authors_intersection",udf_authors_overlap(finalDF("aut
hors_from_words"),finalDF("authors_to_words")))
finalDF.count()
finalDF = finalDF.withColumn("time_dist",abs($"year_from" -
$"year_to"))
println("Show schema of finalDF:\n")
finalDF.printSchema()
println("Dropping unused columns from finalDF...\n")
val finalCollsDF = finalDF.select("label","titles_intersection",
"authors_intersection", "time_dist")
println("Printing schema for finalDF...\n")
finalCollsDF.printSchema()
println("Creating features coll from finalDF using
VectorAssembler...\n")
val assembler = new VectorAssembler()
.setInputCols(Array("titles_intersection", "authors_intersection",
"time_dist"))
.setOutputCol("features")
val output = assembler.transform(finalCollsDF)
println("Printing final schema before trainning...\n")
output.printSchema()
output.na.drop()
println("Splitting dataset into trainingData and testData...\n")
val Array(trainingData, testData) = output.randomSplit(Array(0.6,
0.4))
val lr = new LogisticRegression()
.setFeaturesCol("features")
.setLabelCol("label")
.setPredictionCol("prediction")
.setRawPredictionCol("prediction_raw")
.setMaxIter(10)
val lr_model = lr.fit(trainingData)
val lr_results = lr_model.transform(testData)
val evaluator = new BinaryClassificationEvaluator()
.setRawPredictionCol("prediction_raw")
.setLabelCol("label")
.setMetricName("areaUnderPR")
println("RESULTS FOR LOGISTIC REGRESSION:")
println(evaluator.evaluate(lr_results))
}
Две функции UDF, которые я использую, следующие:
def findNumberCommonWordsTitle(title_from:Seq[String],
title_to:Seq[String]) ={
val intersection = title_from.intersect(title_to)
intersection.length
}
def findNumberCommonAuthors(author_from:Seq[String],
author_to:Seq[String])={
val intersection = author_from.intersect(author_to)
intersection.length
}
Я искал нулевые значения вручную, используя оператор foreach, но он тоже не работал.Как я могу решить эту проблему.Может быть, у меня есть другая проблема, которую я не могу найти.
Спасибо