Scala Spark: рассчитать площадь под RO C для каждого идентификатора - PullRequest
1 голос
/ 05 марта 2020

У меня есть кадр данных в формате:

|      id |         prediction |  labels |
|595504101| 8.60031862132395E-5|      1.0|
|555968901|0.001435137738115154|      0.0|
|987301901|              5.0E-4|      0.0|

Я пытаюсь вычислить AU C -RO C для каждого из идентификаторов в этом кадре данных, где каждый идентификатор может иметь несколько строк прогнозы и метки. Я попытался использовать подход udf, чтобы сделать это -

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics 
import org.apache.spark.sql.types.{StructType, DoubleType}
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD

val evaluator = new BinaryClassificationEvaluator().setLabelCol("labels").setRawPredictionCol("predictions").setMetricName("areaUnderROC")

val predictionsAndLabels = aucDataFoo
                      .withColumn("predictionsAndLabels", struct("predictions", "labels"))
                      .groupBy("id")
                      .agg(collect_list("predictionsAndLabels").as("predictionsAndLabels"))

def test = (s: Seq[Row]) => {
    val f = s.map(x => {
        val (prediction: Double, label:Double) = (x(0), x(1))
    (prediction,label)
    })
    val predictionsAndLabels = f.toDS
    val auc = evaluator.evaluate(predictionsAndLabels)
auc
}

val testudf = udf(test, DoubleType)

val predictionsAndLabelsDF = predictionsAndLabels.withColumn("auc", testudf($"predictionsAndLabels"))

Это выдает ошибку

java.lang.StackOverflowError
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1108)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
  at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)...

Используя наивный метод, где я читаю список различных идентификаторов в память и использую его отобразить метод оценки BinaryClassificationEvaluator для каждого идентификатора, не масштабируется и не использует параллелизм Spark.

Решение, опубликованное для этого вопроса , рассчитывает AU C -PR, а не AU C -RO C. И я не могу использовать sklearn.

Есть ли способ расчета AU C -RO C для каждого из идентификаторов в scala

...