Все, что вам действительно нужно сделать, это подсчитать путем агрегирования по data
, в зависимости от 2 значений, которые может принимать логическое значение. Остальное - простое вычисление, которое зависит только от этих двух значений.
val rdd = sc.parallelize(
Seq(Element(true,"a"),Element(false,"a"),Element(true,"a"),
Element(false,"b"),Element(false,"b"),Element(true,"b")))
val log2 = math.log(2)
// calculate an RDD[(String, (Int, Int))], first element of the tuple is the number of "true"s, and the second the number of "false"s
val entropy = rdd.map(e => (e.data, e.target)).aggregateByKey((0, 0))({
case ((t, f), target) => if (target) (t + 1, f) else (t, f + 1)
}, {
case ((t1, f1), (t2, f2)) => (t1 + t2, f1 + f2)
}).mapValues {
case (t, f) =>
val total = (t + f).toDouble
val trueRatio = t.toDouble / total
val falseRatio = f.toDouble / total
-trueRatio * math.log(trueRatio) / log2 + falseRatio * math.log(falseRatio) / log2
}
// entropy is an RDD[(String, Double)]
entropy foreach println
// (a,-0.1383458330929479)
// (b,0.1383458330929479)