Сопоставление значений ничего не возвращает в Scala Flink - PullRequest
0 голосов
/ 27 июня 2018

Я разрабатываю алгоритм дискретизации в мгновение ока, но у меня возникают проблемы с применением функции map.

Дискретизация сохраняется в V, что является

private[this] val V = Vector.tabulate(nAttrs)(i => IntervalHeap(nBins, i, s))

Этот вектор обновляется следующим способом:

private[this] def updateSamples(v: LabeledVector): Vector[IntervalHeap] = {
    val attrs = v.vector.map(_._2)
    // TODO: Check for missing values
    attrs
      .zipWithIndex
      .foreach {
        case (attr, i) =>
          if (V(i).nInstances < s) {
            V(i) insert (attr)
          } else {
            if (randomReservoir(0) <= s / (i + 1)) {
              val randVal = Random nextInt (s)
              V(i) replace (randVal, attr)
            }
          }
      }
    V
  }

И функция map применяется к каждому экземпляру набора данных функцией `updateSamples):

def discretize(data: DataSet[LabeledVector]) /*: DataSet[IntervalHeap]*/ = {
    val d = data map (x => updateSamples(x))
    log.debug(s"$V")
    d print
  }

Однако, когда я печатаю d, я получаю V пустым:

Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])

Я пытался не возвращать V в updateSamples и просто получить к нему доступ с driscretize после применения карты, но происходит то же самое. Если внутри updateSamples я печатаю значение V, я вижу, что оно обновляется.

Обновление

Если я не использую контейнер DataSet[T] и вместо этого использую Seq, например:

def discretize(data: Seq[LabeledVector]) /*: DataSet[IntervalHeap]*/ = {
    data map (x => updateSamples(x))
  }

Дискретность работает отлично.

Что может происходить?

Обновление 2

После нескольких дней поиска кажется, что проблема в коллекции гуавы MinMaxPriorityQueue. Тем не менее, я не могу найти ничего, что поможет мне сериализировать эту коллекцию, вот что я нашел до сих пор:

  • Как сериализовать коллекцию гуавы?
  • magro / kryo-serializer Это коллекция сериализаторов для коллекций гуавы, но MinMaxPriorityQeue там нет
  • Я пробовал это , которое состоит в регистрации kryo Сериализатора env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer]), но не работает.
  • Я также попытался добавить сериализатор по умолчанию Kryo, но не повезло env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
...