Kryo IndexOutOfBoundsException: индекс: 113, размер: 5 - PullRequest
0 голосов
/ 03 октября 2018

Я использую Kryo 5.0.0-RC1.У меня есть следующее исключение:

java.lang.IndexOutOfBoundsException: индекс: 113, размер: 5 в java.util.ArrayList.rangeCheck (ArrayList.java:657) в java.util.ArrayList.get (ArrayList.java:433) на com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject (MapReferenceResolver.java:60)

Я видел эти исключения во многих местах с прошлой версией Kryo ине ожидал этого в этой версии, так как большая часть решения, по-видимому, была рассмотрена в этой версии, например, очистка вывода и так далее.

В моем случае это происходит во время подготовки.

Из того, что я понимаю, довольно сложно определить.Но те, кто знает Kryo, что означает исключение, поэтому я могу начать решать проблему.Когда в коде Kryo возникает это исключение, и каково его значение.

Просто для записи, в моем случае это очень странно.Исключение случается только через некоторое время.У меня есть конвейер данных, где я использую akka-stream, чтобы получить некоторые данные, передать их через kafka, а затем обработать их в крио.Все работает нормально, пока я не достигну 1 199 541 записей и немного больше обработан.Это всегда происходит вокруг того же количества обработанных записей.

Любая идея или намек на мой взгляд.

Akka-Stream

.mapAsyncUnordered(8){value =>
      Future{
        val kryo = kryoPool.obtain()
        val outStream = new ByteArrayOutputStream()
        val output = new Output(outStream, 4096)
        kryo.writeClassAndObject(output, value)
        output.close()
        kryoPool.free(kryo)
        new ProducerRecord[String, Array[Byte]]("NormalizedRawData", outStream.toByteArray)
        }
      }
      .runWith(Producer.plainSink(producerSettings))

Spark Init

    object KryoContext {
        lazy val kryoPool = new Pool[Kryo](true, false, 16) {
          protected def create(): Kryo = {
            val cl = Thread.currentThread().getContextClassLoader()
            val kryo = new Kryo()
            kryo.setClassLoader(cl)
            kryo.setRegistrationRequired(false)
            kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
            kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
            kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
            kryo
          }
        }

        lazy val outputPool = new Pool[Output](true, false, 16) {
          protected def create: Output = new Output(4096)
        }

        lazy val inputPool = new Pool[Input](true, false, 16) {
          protected def create: Input = new Input(4096)
        }
}

Использование Spark

   def decodeData(rowOfBinaryList: List[Row], kryoPool: Pool[Kryo], inputPool: Pool[Input]): List[RawData] = {

    val kryo = kryoPool.obtain()
    val input = inputPool.obtain()
    val data = rowOfBinaryList.map(r => r.getAs[Array[Byte]]("message")).map{ binaryMsg =>
      input.setInputStream(new ByteArrayInputStream(binaryMsg))
      val value = kryo.readClassAndObject(input).asInstanceOf[RawData]
      input.close()
      value
    }
    kryoPool.free(kryo)
    data
}

Если у вас есть какие-либо подсказки, которые могут помочь, пожалуйста, просто предложите что-нибудь.

...