В этом ответе Оптимизация преобразования Флинка :
def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
val su = xy.reduceGroup { in ⇒
val invec = in.toVector
val x = invec map (_._2)
val y = invec map (_._1)
val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)
2 * mu / (Hx + Hy)
}
su.collect.head
}
Я написал функцию для вычисления симметричной неопределенности с ReduceGroup
. Но это медленно на больших наборах данных.
Я читал о Combinable GroupReduceFunctions в документации Флинка, и я пытаюсь написать GroupReduceFunction
для вычисления симметричной неопределенности:
class MyCombinableGroupReducer
extends GroupReduceFunction[(Double, Double), Double]
with GroupCombineFunction[(Double, Double), (Double, Double)]{
override def reduce(
in: Iterable[(Double, Double)],
out: Collector[Double]): Unit =
{
val x = in map(_._2)
val y = in map(_._1)
// collect...
}
override def combine(
in: Iterable[(Double, Double)],
out: Collector[(Double, Double)]): Unit =
{
// ...
}
}
Можно ли рассчитать эту меру параллельно? Я не знаю, что мне написать для функций reduce
и combine
.