Состояние класса получает потерю между вызовами функций во Flink - PullRequest
0 голосов
/ 02 июля 2018

У меня есть этот класс:

case class IDADiscretizer(
  nAttrs: Int,
  nBins: Int = 5,
  s: Int = 5) extends Serializable {

  private[this] val log = LoggerFactory.getLogger(this.getClass)
  private[this] val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
  private[this] val randomReservoir = SamplingUtils.reservoirSample((1 to s).toList.iterator, 1)

  def updateSamples(v: LabeledVector): Vector[IntervalHeapWrapper] = {
    val attrs = v.vector.map(_._2)
    val label = v.label
    // TODO: Check for missing values
    attrs
      .zipWithIndex
      .foreach {
        case (attr, i) =>
          if (V(i).getNbSamples < s) {
            V(i) insertValue attr // insert
          } else {
            if (randomReservoir(0) <= s / (i + 1)) {
              //val randVal = Random nextInt s
              //V(i) replace (randVal, attr)
              V(i) insertValue attr
            }
          }
      }
    V
  }

  /**
   * Return the cutpoints for the discretization
   *
   */
  def cutPoints: Vector[Vector[Double]] = V map (_.getBoundaries.toVector)

  def discretize(data: DataSet[LabeledVector]): (DataSet[Vector[IntervalHeapWrapper]], Vector[Vector[Double]]) = {
    val r = data map (x => updateSamples(x))
    val c = cutPoints

    (r, c)
  }
}

Используя flink, я хотел бы получить контрольные точки после вызова discretize, но, похоже, информация, хранящаяся в V, теряется. Должен ли я использовать Broadcast как в этот вопрос ? Есть ли лучший способ получить доступ к состоянию класса?

Я пытался позвонить cutpoints двумя способами, один с:

def discretize(data: DataSet[LabeledVector]) = data map (x => updateSamples(x))

Тогда позвонили извне:

val a = IDADiscretizer(nAttrs = 4)
val r = a.discretize(dataSet)
r.print
val cuts = a.cutPoints

Здесь срезы пусты, поэтому я попытался вычислить дискретность, а также точки среза внутри discretize:

def discretize(data: DataSet[LabeledVector]) = {
    val r = data map (x => updateSamples(x))
    val c = cutPoints

    (r, c)
  }

И используйте это так:

val a = IDADiscretizer(nAttrs = 4)
val (d, c) = a.discretize(dataSet)
c foreach println

Но происходит то же самое.

Наконец, я также попытался сделать V полностью открытым:

val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))

Все еще пусто

Что я делаю не так?

Похожие вопросы:

Ответ

Благодаря @TillRohrmann я наконец-то сделал:

private[this] def computeCutPoints(x: LabeledVector) = {
    val attrs = x.vector.map(_._2)
    val label = x.label
    attrs
      .zipWithIndex
      .foldLeft(V) {
        case (iv, (v, i)) =>
          iv(i) insertValue v
          iv
      }
  }

  /**
   * Return the cutpoints for the discretization
   *
   */
  def cutPoints(data: DataSet[LabeledVector]): Seq[Seq[Double]] =
    data.map(computeCutPoints _)
      .collect
      .last.map(_.getBoundaries.toVector)

  def discretize(data: DataSet[LabeledVector]): DataSet[LabeledVector] =
data.map(updateSamples _)

А затем используйте это так:

val a = IDADiscretizer(nAttrs = 4)
val d = a.discretize(dataSet)
val cuts = a.cutPoints(dataSet)
d.print
cuts foreach println

Я не знаю, является ли это лучшим способом, но, по крайней мере, сейчас работает.

1 Ответ

0 голосов
/ 03 июля 2018

Принцип работы Flink заключается в том, что пользователь определяет операторы / пользовательские функции, которые работают с входными данными, поступающими из исходной функции. Для выполнения программы код пользователя отправляется в кластер Flink, где она выполняется. Результаты вычислений должны быть выведены в некоторую систему хранения через функцию приемника.

Из-за этого невозможно легко смешивать локальные и распределенные вычисления, как вы пытаетесь использовать свое решение. discretize определяет оператор map, который преобразует входные данные DataSet data. Эта операция будет выполнена, например, после того, как вы позвоните ExecutionEnvironment#execute или DataSet#print. Теперь код пользователя и определение для IDADiscretizer отправляются в кластер, где они создаются. Flink обновит значения в экземпляре IDADiscretizer, который отличается от экземпляра, установленного на клиенте.

...