Как эффективно отладить долго работающее приложение Spark? - PullRequest
0 голосов
/ 01 мая 2018

У меня есть приложение Spark, которое очищает и подготавливает набор данных, а затем применяет алгоритм кластеризации K-средних к этому набору. После этого рассчитываются некоторые показатели полученных кластеров.

Естественно, вычисление векторных кластеров K-средних является задачей, которая имеет длительное время выполнения. При отладке расчета для метрик кластера я не могу быстро выполнить итерацию своего кода, поскольку кластеры рассчитываются при каждом выполнении. Как мне это решить?

У меня есть следующие идеи:

  • написание модульного теста для методов расчета метрики. Но было бы сложно смоделировать данные кластера как
  • Сериализация вычисленных K-средних векторных кластерных данных на диск

Любая помощь приветствуется

Код для справки:

def main(args: Array[String]): Unit = {

    // -- start of long running execution
    val lines   = sc.textFile("src/main/resources/stackoverflow/stackoverflow.csv")
    val raw     = rawPostings(lines)
    val grouped = groupedPostings(raw)
    val scored  = scoredPostings(grouped)
    val vectors = vectorPostings(scored)
   //    assert(vectors.count() == 2121822, "Incorrect number of vectors: " + vectors.count())

    val means   = kmeans(sampleVectors(vectors), vectors, debug = true)
    // -- end of long running execution

    val results = clusterResults(means, vectors) // < -- this method operates on the result of previous ops
    printResults(results)
  }

Реализация clusterResults:

def clusterResults(means: Array[(Int, Int)], vectors: RDD[(LangIndex, HighScore)]): Array[(String, Double, Int, Int)] = {
    // -- Note that means is quite intensive to compute
    val closest = vectors.map(p => (findClosest(p, means), p)) // -- (Int, (LangIndex, HighScore))
    val closestGrouped = closest.groupByKey() // -- (Int, Iter((LangIndex, HighScore))

    vectors.take(3).foreach(println)

    val median = closestGrouped.mapValues { vs =>
      // @todo: what does groupBy(identity) do?
      // Predef.idintity is equivalent ot x => x
      val langId: Int   = vs.map(_._1).groupBy(identity).maxBy(_._2.size)._1 // most common language in the cluster
      val langLabel: String = langs(langId / langSpread)
      val langPercent: Double = vs.map(_._1).count(_.equals(langId)) / vs.size // percent of the questions in the most common language (= number of questions in most common lang divided by total questions)
      val clusterSize: Int    = vs.size
      val medianScore: Int    = vs.map(_._2).

      (langLabel, langPercent, clusterSize, medianScore)
    }

    median.collect().map(_._2).sortBy(_._4)
  }
...