Увеличение хеш-таблиц в MinHashLSH, снижение точности и f1 - PullRequest
0 голосов
/ 16 февраля 2019

Я использовал MinHashLSH с приблизительным значением сходства со Scala и Spark 2.4, чтобы найти границы между сетью.Предсказание ссылок на основе сходства документов.Моя проблема в том, что, хотя я увеличиваю хэш-таблицы в MinHashLSH, моя точность и оценка F1 снижаются.Все, что я уже прочитал для этого алгоритма, показывает мне, что у меня есть проблема.

Я пробовал различное количество хеш-таблиц, и я предоставил различное количество порогов сходства Жакара, но у меня точно такая же проблема, точность быстро снижается.Я также пробовал разные выборки моего набора данных, и ничего не изменилось.Мой рабочий процесс продолжается следующим образом: я объединяю все текстовые столбцы моего информационного кадра, который включает заголовок, авторов, журнал и реферат, а затем я размечаю объединенный столбец на слова.Затем я использую CountVectorizer для преобразования этого «пакета слов» в векторы.Затем я предоставляю этот столбец в MinHashLSH с некоторыми хеш-таблицами и, наконец, я делаю примерное сходство, чтобы найти похожие «документы», которые находятся под моим заданным порогом.Моя реализация следующая.

import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
import UnsupervisedLinkPrediction.BroutForce.join
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf, when}
import org.apache.spark.sql.types._


object lsh {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR) // show only errors

//    val cores=args(0).toInt
//    val partitions=args(1).toInt
//    val hashTables=args(2).toInt
//    val limit = args(3).toInt
//    val threshold = args(4).toDouble

    val cores="*"
    val partitions=1
    val hashTables=16
    val limit = 1000
    val jaccardDistance = 0.89

    val master = "local["+cores+"]"

    val ss = SparkSession.builder().master(master).appName("MinHashLSH").getOrCreate()
    val sc = ss.sparkContext

    val inputFile = "resources/data/node_information.csv"

    println("reading from input file: " + inputFile)
    println

    val schemaStruct = StructType(
      StructField("id", IntegerType) ::
        StructField("pubYear", StringType) ::
        StructField("title", StringType) ::
        StructField("authors", StringType) ::
        StructField("journal", StringType) ::
        StructField("abstract", StringType) :: Nil
    )

    // Read the contents of the csv file in a dataframe. The csv file contains a header.
    //    var papers = ss.read.option("header", "false").schema(schemaStruct).csv(inputFile).limit(limit).cache()

    var papers = ss.read.option("header", "false").schema(schemaStruct).csv(inputFile).limit(limit).cache()

    papers.repartition(partitions)
    println("papers.rdd.getNumPartitions"+papers.rdd.getNumPartitions)

    import ss.implicits._
    // Read the original graph edges, ground trouth
    val originalGraphDF = sc.textFile("resources/data/Cit-HepTh.txt").map(line => {
      val fields = line.split("\t")
      (fields(0), fields(1))
    }).toDF("nodeA_id", "nodeB_id").cache()

    val originalGraphCount = originalGraphDF.count()

    println("Ground truth count: " + originalGraphCount )

    val nullAuthor = ""
    val nullJournal = ""
    val nullAbstract = ""

    papers = papers.na.fill(nullAuthor, Seq("authors"))
    papers = papers.na.fill(nullJournal, Seq("journal"))
    papers = papers.na.fill(nullAbstract, Seq("abstract"))

    papers = papers.withColumn("nonNullAbstract", when(col("abstract") === nullAbstract, col("title")).otherwise(col("abstract")))
    papers = papers.drop("abstract").withColumnRenamed("nonNullAbstract", "abstract")
    papers.show(false)

        val filteredGt= originalGraphDF.as("g").join(papers.as("p"),(
          $"g.nodeA_id" ===$"p.id") || ($"g.nodeB_id" ===$"p.id")
        ).select("g.nodeA_id","g.nodeB_id").distinct().cache()

    filteredGt.show()

    val filteredGtCount = filteredGt.count()
    println("Filtered GroundTruth count: "+ filteredGtCount)

    //TOKENIZE

    val tokPubYear = new Tokenizer().setInputCol("pubYear").setOutputCol("pubYear_words")
    val tokTitle = new Tokenizer().setInputCol("title").setOutputCol("title_words")
    val tokAuthors = new RegexTokenizer().setInputCol("authors").setOutputCol("authors_words").setPattern(",")
    val tokJournal = new Tokenizer().setInputCol("journal").setOutputCol("journal_words")
    val tokAbstract = new Tokenizer().setInputCol("abstract").setOutputCol("abstract_words")

    println("Setting pipeline stages...")
    val stages = Array(
      tokPubYear, tokTitle, tokAuthors, tokJournal, tokAbstract
      //      rTitle, rAuthors, rJournal, rAbstract
    )

    val pipeline = new Pipeline()
    pipeline.setStages(stages)

    println("Transforming dataframe\n")
    val model = pipeline.fit(papers)
    papers = model.transform(papers)

    println(papers.count())
    papers.show(false)
    papers.printSchema()

    val udf_join_cols = udf(join(_: Seq[String], _: Seq[String], _: Seq[String], _: Seq[String], _: Seq[String]))

    val joinedDf = papers.withColumn(
      "paper_data",
      udf_join_cols(
        papers("pubYear_words"),
        papers("title_words"),
        papers("authors_words"),
        papers("journal_words"),
        papers("abstract_words")
      )
    ).select("id", "paper_data").cache()

    joinedDf.show(5,false)

    val vocabSize = 1000000
    val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("paper_data").setOutputCol("features").setVocabSize(vocabSize).setMinDF(10).fit(joinedDf)
    val isNoneZeroVector = udf({v: Vector => v.numNonzeros > 0}, DataTypes.BooleanType)
    val vectorizedDf = cvModel.transform(joinedDf).filter(isNoneZeroVector(col("features"))).select(col("id"), col("features"))
    vectorizedDf.show()

    val mh = new MinHashLSH().setNumHashTables(hashTables)
      .setInputCol("features").setOutputCol("hashValues")
    val mhModel = mh.fit(vectorizedDf)

    mhModel.transform(vectorizedDf).show()

    vectorizedDf.createOrReplaceTempView("vecDf")

    println("MinHashLSH.getHashTables: "+mh.getNumHashTables)

    val dfA = ss.sqlContext.sql("select id as nodeA_id, features from vecDf").cache()
    dfA.show(false)
    val dfB = ss.sqlContext.sql("select id as nodeB_id, features from vecDf").cache()
    dfB.show(false)

    val predictionsDF = mhModel.approxSimilarityJoin(dfA, dfB, jaccardDistance, "JaccardDistance").cache()

    println("Predictions:")
    val predictionsCount = predictionsDF.count()
    predictionsDF.show()
    println("Predictions count: "+predictionsCount)

        predictionsDF.createOrReplaceTempView("predictions")

        val pairs = ss.sqlContext.sql("select datasetA.nodeA_id, datasetB.nodeB_id, JaccardDistance from predictions").cache()
        pairs.show(false)

        val totalPredictions = pairs.count()

        println("Properties:\n")
        println("Threshold: "+threshold+"\n")
        println("Hahs tables: "+hashTables+"\n")
        println("Ground truth: "+filteredGtCount)
        println("Total edges found: "+totalPredictions +" \n")


        println("EVALUATION PROCESS STARTS\n")
        println("Calculating true positives...\n")

        val truePositives = filteredGt.as("g").join(pairs.as("p"),
          ($"g.nodeA_id" === $"p.nodeA_id" && $"g.nodeB_id" === $"p.nodeB_id") || ($"g.nodeA_id" === $"p.nodeB_id" && $"g.nodeB_id" === $"p.nodeA_id")
        ).cache().count()

       println("True Positives: "+truePositives+"\n")

        println("Calculating false positives...\n")

        val falsePositives = predictionsCount - truePositives

        println("False Positives: "+falsePositives+"\n")

        println("Calculating true negatives...\n")
        val pairsPerTwoCount = (limit *(limit - 1)) / 2

        val trueNegatives = (pairsPerTwoCount - truePositives) - falsePositives
        println("True Negatives: "+trueNegatives+"\n")

        val falseNegatives = filteredGtCount - truePositives

        println("False Negatives: "+falseNegatives)

        val truePN = (truePositives+trueNegatives).toFloat
        println("TP + TN sum: "+truePN+"\n")

        val sum = (truePN + falseNegatives+ falsePositives).toFloat
        println("TP +TN +FP+ FN sum: "+sum+"\n")

        val accuracy = (truePN/sum).toFloat
        println("Accuracy: "+accuracy+"\n")

        val precision = truePositives.toFloat / (truePositives+falsePositives).toFloat
        val recall = truePositives.toFloat/(truePositives+falseNegatives).toFloat

        val f1Score = 2*(recall*precision)/(recall+precision).toFloat
        println("F1 score: "+f1Score+"\n")

    ss.stop()

Я забыл вам сказать, что я выполняю этот код в кластере с 40 ядрами и 64 г оперативной памяти.Обратите внимание, что примерное объединение сходства (реализация Spark) работает с JACCARD DISTANCE, а не с JACCARD INDEX.Поэтому в качестве порога сходства я указываю РАССТОЯНИЕ JACCARD, которое в моем случае равно jaccardDistance = 1 - порог.(Порог = Индекс Жакара).

Я ожидал получить более высокую точность и оценку f1, пока увеличиваю хеш-таблицы.У вас есть идеи по поводу моей проблемы?

Спасибо всем заранее!

1 Ответ

0 голосов
/ 16 февраля 2019

Здесь есть несколько видимых проблем, и, возможно, более скрытых, поэтому просто перечислим несколько:

  • На самом деле LSH не является классификатором, и попытка оценить его, поскольку не имеет особого смысла, даже если вы предполагаете, что сходство текста каким-то образом является прокси для цитирования (а если оно велико).
  • Если проблему нужно было представить как проблему классификации, ее следует рассматривать как классификацию с несколькими метками (каждая статья можетцитировать или цитироваться несколькими источниками), а не классификацией нескольких классов, поэтому простая точность не имеет смысла.
  • Даже если это была классификация и ее можно было оценить как таковой, ваши расчеты не включают фактические негативы, которые нене соответствует порогу approxSimilarityJoin
  • Кроме того, установка порога равным 1 ограничивает объединения либо точными совпадениями, либо случаями хеш-конфликтов - следовательно, предпочтение LSH с более высокими коэффициентами коллизий.

Дополнительно:

  • Подход к обработке текста, который вы выбрали, довольно пешеходный ипредпочитает неспецифические функции (помните, что вы не оптимизируете свою фактическую цель, но сходство текста).
  • Такой подход, особенно рассматривая все как равные, отбрасывает большую часть полезной информации в наборе, но не ограничивается, временные отношения ..
...