У меня есть следующий метод, который вычисляет вероятность значения в DataSet
:
/**
* Compute the probabilities of each value on the given [[DataSet]]
*
* @param x single colum [[DataSet]]
* @return Sequence of probabilites for each value
*/
private[this] def probs(x: DataSet[Double]): Seq[Double] = {
val counts = x.groupBy(_.doubleValue)
.reduceGroup(_.size.toDouble)
.name("X Probs")
.collect
val total = counts.sum
counts.map(_ / total)
}
Проблема заключается в том, что когда я отправляю свою работу Flink, которая использует этот метод, он вызывает Flink, чтобы убить работу из-за задачи TimeOut
. Я выполняю этот метод для каждого атрибута на DataSet
только с 40 000 экземпляров и 9 атрибутов.
Есть ли способ сделать этот код более эффективным?
После нескольких попыток я заставил его работать с mapPartition
, этот метод является частью класса InformationTheory
, который выполняет некоторые вычисления для вычисления энтропии, взаимной информации и т. Д. Так, например, вычисляется SymmetricalUncertainty
как это:
/**
* Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
*
* It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
*
* @param xy [[DataSet]] with two features
* @return SU value
*/
def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
val su = xy.mapPartitionWith {
case in ⇒
val x = in map (_._2)
val y = in map (_._1)
val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)
Some(2 * mu / (Hx + Hy))
}
su.collect.head.head
}
С этим я могу эффективно вычислять entropy
, взаимную информацию и т. Д. Суть в том, что он работает только с уровнем параллелизма 1, проблема заключается в mapPartition
.
Есть ли способ, которым я мог бы сделать что-то похожее на то, что я делаю здесь с SymmetricalUncertainty
, но с любым уровнем параллелизма?