У меня есть кадр данных в формате:
| id | prediction | labels |
|595504101| 8.60031862132395E-5| 1.0|
|555968901|0.001435137738115154| 0.0|
|987301901| 5.0E-4| 0.0|
Я пытаюсь вычислить AU C -RO C для каждого из идентификаторов в этом кадре данных, где каждый идентификатор может иметь несколько строк прогнозы и метки. Я попытался использовать подход udf, чтобы сделать это -
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.types.{StructType, DoubleType}
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD
val evaluator = new BinaryClassificationEvaluator().setLabelCol("labels").setRawPredictionCol("predictions").setMetricName("areaUnderROC")
val predictionsAndLabels = aucDataFoo
.withColumn("predictionsAndLabels", struct("predictions", "labels"))
.groupBy("id")
.agg(collect_list("predictionsAndLabels").as("predictionsAndLabels"))
def test = (s: Seq[Row]) => {
val f = s.map(x => {
val (prediction: Double, label:Double) = (x(0), x(1))
(prediction,label)
})
val predictionsAndLabels = f.toDS
val auc = evaluator.evaluate(predictionsAndLabels)
auc
}
val testudf = udf(test, DoubleType)
val predictionsAndLabelsDF = predictionsAndLabels.withColumn("auc", testudf($"predictionsAndLabels"))
Это выдает ошибку
java.lang.StackOverflowError
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1108)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)...
Используя наивный метод, где я читаю список различных идентификаторов в память и использую его отобразить метод оценки BinaryClassificationEvaluator для каждого идентификатора, не масштабируется и не использует параллелизм Spark.
Решение, опубликованное для этого вопроса , рассчитывает AU C -PR, а не AU C -RO C. И я не могу использовать sklearn.
Есть ли способ расчета AU C -RO C для каждого из идентификаторов в scala