Оптимизация преобразования Flink - PullRequest
0 голосов
/ 03 сентября 2018

У меня есть следующий метод, который вычисляет вероятность значения в DataSet:

/**
   * Compute the probabilities of each value on the given [[DataSet]]
   *
   * @param x single colum [[DataSet]]
   * @return Sequence of probabilites for each value
   */
  private[this] def probs(x: DataSet[Double]): Seq[Double] = {
        val counts = x.groupBy(_.doubleValue)
          .reduceGroup(_.size.toDouble)
          .name("X Probs")
          .collect

        val total = counts.sum

        counts.map(_ / total)
  }

Проблема заключается в том, что когда я отправляю свою работу Flink, которая использует этот метод, он вызывает Flink, чтобы убить работу из-за задачи TimeOut. Я выполняю этот метод для каждого атрибута на DataSet только с 40 000 экземпляров и 9 атрибутов.

Есть ли способ сделать этот код более эффективным?

После нескольких попыток я заставил его работать с mapPartition, этот метод является частью класса InformationTheory, который выполняет некоторые вычисления для вычисления энтропии, взаимной информации и т. Д. Так, например, вычисляется SymmetricalUncertainty как это:

/**
   * Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
   *
   * It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
   *
   * @param xy [[DataSet]] with two features
   * @return SU value
   */
  def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
    val su = xy.mapPartitionWith {
      case in ⇒
        val x = in map (_._2)
        val y = in map (_._1)

        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)

        Some(2 * mu / (Hx + Hy))
    }

    su.collect.head.head
  }

С этим я могу эффективно вычислять entropy, взаимную информацию и т. Д. Суть в том, что он работает только с уровнем параллелизма 1, проблема заключается в mapPartition.

Есть ли способ, которым я мог бы сделать что-то похожее на то, что я делаю здесь с SymmetricalUncertainty, но с любым уровнем параллелизма?

1 Ответ

0 голосов
/ 07 сентября 2018

Я наконец сделал это, не знаю, является ли это лучшим решением, но он работает с n уровнями параллелизма:

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
    val su = xy.reduceGroup { in ⇒
        val invec = in.toVector
        val x = invec map (_._2)
        val y = invec map (_._1)

        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)

        2 * mu / (Hx + Hy)
    }

    su.collect.head
  } 

Вы можете проверить весь код на InformationTheory.scala и его тестах InformationTheorySpec.scala

...