Определите, какой объект не сериализуем в Apache-Flink - PullRequest
0 голосов
/ 27 августа 2018

Я пишу преобразователь Flink, и у меня есть пользовательский объект Histogram со следующими атрибутами:

case class Histogram(
  nRows: Int,
  nCols: Int,
  min: Int,
  step: Double,
  private val countMatrix: Array[ArrayBuffer[Double]],
  private val cutMatrixL1: Array[ArrayBuffer[Double]],
  val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
  private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
  private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
  extends Serializable {
    ???
}

Это мой FitOperation:

implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] {
    override def fit(
                      instance: PIDiscretizerTransformer,
                      fitParameters: ParameterMap,
                      input: DataSet[LabeledVector]): Unit = {

      // get params...

      val metric = input.map { x ⇒
        // (instance, histrogram totalCount)
        (x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
      }.reduce { (m1, m2) ⇒
        // Update Layer 1
        val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)

        //         Update Layer 2 if neccesary
        val updatedL2 = if (m1._3 % l2updateExamples == 0) {
          updateL2(m1._1, updatedL1)
        } else updatedL1

        (m2._1, updatedL2, m1._3 + 1)
      }.map(_._2)

      //      instance.metricsOption = Some(metric)
    }
  }

Это хорошо работает, но если я раскомментирую последнюю строку: instance.metricsOption = Some(metric) я получу java.io.NotSerializableException: org.apache.flink.api.scala.DataSet

Как я могу найти, какой объект в моем классе Histogram вызывает проблему? Насколько я знаю, ArrayBuffer является сериализуемым, как и карта. Хотя я нашел этот ТАК вопрос:

Карта не может быть сериализована в Scala?

с надписью .mapValues не сериализуем, но я нигде не использую .mapValues.

1 Ответ

0 голосов
/ 30 августа 2018

Проблема в том, что вы имеете в виду instance.step внутри вашего MapFunction.instance относится к типу PIDiscretizerTransformer, который нельзя сериализовать.Таким образом, вам нужно вычислить шаг за пределами MapFunction и передать значение в функцию.Тогда ваша программа должна быть сериализуемой.

...