Как оценить minHashLSH в Spark со скалой? - PullRequest
0 голосов
/ 26 января 2019

У меня есть набор научных работ, в нем 27770 статей (узлов) и другой файл (файл графика) с исходными ребрами, длиной 352807 записей. Я хочу рассчитать minHashLSH, чтобы найти похожие документы и предсказать связи между двумя узлами! Ниже вы можете увидеть мою попытку реализовать это на искре с помощью Scala. Проблема, с которой я сталкиваюсь, заключается в том, что я не знаю, как оценить результаты!

def main(args: Array[String]): Unit = {
println("MinHash LSH")
Logger.getLogger("org").setLevel(Level.ERROR) // show only errors


val ss = SparkSession.builder().master("local[*]").appName("neighbors").getOrCreate()
val sc = ss.sparkContext

val inputFile = "resources/data/node_information.csv"

println("reading from input file: " + inputFile)
println

val schemaStruct = StructType(
  StructField("id", IntegerType) ::
    StructField("pubYear", StringType) ::
    StructField("title", StringType) ::
    StructField("authors", StringType) ::
    StructField("journal", StringType) ::
    StructField("abstract", StringType) :: Nil
)

// Read the contents of the csv file in a dataframe. The csv file contains a header.
var papers = ss.read.option("header", "false").schema(schemaStruct).csv(inputFile)

import ss.implicits._
// Read the original graph edges, ground trouth
val originalGraphDF = sc.textFile("resources/data/Cit-HepTh.txt").map(line => {
  val fields = line.split("\t")
  (fields(0), fields(1))
}).toDF("nodeA_id", "nodeB_id")

println("Original graph edges count: " + originalGraphDF.count())
originalGraphDF.printSchema()
originalGraphDF.show(5)

val t1 = System.nanoTime // Start point of the app

val nullAuthor = "NO AUTHORS"
val nullJournal = "NO JOURNAL"
val nullAbstract = "NO ABSTRACT"

papers = papers.na.fill(nullAuthor, Seq("authors"))
papers = papers.na.fill(nullJournal, Seq("journal"))
papers = papers.na.fill(nullAbstract, Seq("abstract"))

papers = papers.withColumn("nonNullAbstract", when(col("abstract") === nullAbstract, col("title")).otherwise(col("abstract")))
papers = papers.drop("abstract").withColumnRenamed("nonNullAbstract", "abstract")
papers.show()

papers = papers.na.drop()
val fraction = 0.1

papers = papers.sample(fraction, 12345L)
//    println(papers.count())

//TOKENIZE

val tokPubYear = new Tokenizer().setInputCol("pubYear").setOutputCol("pubYear_words")
val tokTitle = new Tokenizer().setInputCol("title").setOutputCol("title_words")
val tokAuthors = new RegexTokenizer().setInputCol("authors").setOutputCol("authors_words").setPattern(",")
val tokJournal = new Tokenizer().setInputCol("journal").setOutputCol("journal_words")
val tokAbstract = new Tokenizer().setInputCol("abstract").setOutputCol("abstract_words")

//REMOVE STOPWORDS

val rTitle = new StopWordsRemover().setInputCol("title_words").setOutputCol("title_words_f")
val rAuthors = new StopWordsRemover().setInputCol("authors_words").setOutputCol("authors_words_f")
val rJournal = new StopWordsRemover().setInputCol("journal_words").setOutputCol("journal_words_f")
val rAbstract = new StopWordsRemover().setInputCol("abstract_words").setOutputCol("abstract_words_f")

println("Setting pipeline stages...")
val stages = Array(
  tokPubYear, tokTitle, tokAuthors, tokJournal, tokAbstract,
  rTitle, rAuthors, rJournal, rAbstract
)

val pipeline = new Pipeline()
pipeline.setStages(stages)

println("Transforming dataframe")
val model = pipeline.fit(papers)
papers = model.transform(papers)

papers.show(5)

//newDf = node df
val newDf = papers.select("id", "pubYear_words", "title_words_f", "authors_words_f", "journal_words_f", "abstract_words_f")
newDf.show(5)
newDf.describe().show()

val udf_join_cols = udf(join(_: Seq[String], _: Seq[String], _: Seq[String], _: Seq[String], _: Seq[String]))

val joinedDf = newDf.withColumn(
  "paper_data",
  udf_join_cols(
    newDf("pubYear_words"),
    newDf("title_words_f"),
    newDf("authors_words_f"),
    newDf("journal_words_f"),
    newDf("abstract_words_f"
    )
  )
).select("id", "paper_data")

joinedDf.show(5)
joinedDf.printSchema()
println(joinedDf.count())

// Word count to vector for each wiki content
val vocabSize = 1000000
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("paper_data").setOutputCol("features").setVocabSize(vocabSize)
  .setMinDF(10).fit(joinedDf)

val vectorizedDf = cvModel.transform(joinedDf).select(col("id"), col("features"))
vectorizedDf.show()
println("Total entries: "+vectorizedDf.count())

val mh = new MinHashLSH().setNumHashTables(3)
  .setInputCol("features").setOutputCol("hashValues")
val mhModel = mh.fit(vectorizedDf)

mhModel.transform(vectorizedDf).show()

// Self Join
val threshold = 0.95

val predictinsDF = mhModel.approxSimilarityJoin(vectorizedDf, vectorizedDf, 1,"JaccardDistance")
  .select("datasetA.id","datasetB.id","JaccardDistance").filter("JaccardDistance >= "+threshold)
  .withColumnRenamed("datasetA.id","nodeA_id")
  .withColumnRenamed("datasetB.id","nodeB_id")

predictinsDF.show()
predictinsDF.printSchema()
println("Total edges found: "+predictinsDF.count())  }  

Исходный граф - это файл с формой nodeAId, nodeBId. Мои результаты представлены в форме nodeAId, nodeBId, JaccardShoity. Оба из них являются датафреймами. Как я могу оценить свои результаты и получить Точность или балл F1?

Я прочитал, как найти точность и оценку F1, поэтому я попытался создать функцию для их вычисления. Мой подход - код ниже.

def getStats(spark:SparkSession,nodeDF:DataFrame, pairsDF:DataFrame, predictionsDF:DataFrame, graphDF:DataFrame): Unit ={
Logger.getLogger("org").setLevel(Level.ERROR)

import spark.implicits._
val truePositives = graphDF.as("g").join(predictionsDF.as("p"),
  ($"g.nodeA_id" === $"p.nodeA_id" && $"g.nodeB_id" === $"p.nodeB_id") || ($"g.nodeA_id" === $"p.nodeB_id" && $"g.nodeB_id" === $"p.nodeA_id")
).count()

val df = pairsDF.as("p").join(graphDF.as("g"),
  ($"p.nodeA_id" === $"g.nodeA_id" && $"p.nodeB_id" === $"g.nodeB_id") || ($"p.nodeA_id" === $"g.nodeB_id" && $"p.nodeB_id" === $"g.nodeA_id")
).count()
println("True Positives: "+truePositives)

val falsePositives = predictionsDF.count() - truePositives
println("False Positives: "+falsePositives)

val trueNegatives = (pairsDF.count() - graphDF.count()) - falsePositives
println("True Negatives: "+trueNegatives)

val falseNegatives = graphDF.count()-truePositives
println("False Negatives: "+falseNegatives)

val truePN = (truePositives+trueNegatives).toFloat

val sum = (truePN + falseNegatives+ falsePositives).toFloat

val accuracy = (truePN/sum).toFloat
println("Accuracy: "+accuracy)

val precision = truePositives.toFloat / (truePositives+falsePositives).toFloat
val recall = truePositives.toFloat/(truePositives+falseNegatives).toFloat

val f1Score = 2*(recall*precision)/(recall+precision).toFloat
println("F1 score: "+f1Score) }  

Но, когда я попытаюсь запустить его, он никогда не закончится !! Я не знаю, как улучшить или исправить это, чтобы получить Точность и F1 балл. Есть ли более простой способ сделать это?

Спасибо всем вам!

1 Ответ

0 голосов
/ 27 января 2019

Существует несколько способов улучшить производительность выполнения:

  1. Кэширование: Если оно подходит для вашей настройки, вы можете кешировать nodeDF,pairsDF, predictionsDF фреймы данных перед вызовом getStats метода.Во второй части вашего кода одно и то же действие было выполнено для одного и того же кадра данных несколько раз graphDF.count().Поскольку искра следует ленивому методу оценки, будет выполняться повторное выполнение, поэтому вы можете сохранить это значение в переменной, чтобы его можно было использовать.

  2. Найдите виновника: В основном я иду по пути повышения производительности.Когда задание spark отправлено, sparkUI покажет весь план выполнения, созданный spark, и покажет, какая задача занимает больше времени и других ресурсов.Вам может потребоваться больше ресурсов или выполнить некоторую настройку, чтобы между исполнителями происходило меньше перетасовок.

  3. Подтвердите с оптимальным аргументом: Перед отправкой искрового задания убедитесь, что оптимальноеРесурс использует из настройки.Для получения дополнительной информации: оптимальное распределение ресурсов

...