Я использую Kryo 5.0.0-RC1.У меня есть следующее исключение:
java.lang.IndexOutOfBoundsException: индекс: 113, размер: 5 в java.util.ArrayList.rangeCheck (ArrayList.java:657) в java.util.ArrayList.get (ArrayList.java:433) на com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject (MapReferenceResolver.java:60)
Я видел эти исключения во многих местах с прошлой версией Kryo ине ожидал этого в этой версии, так как большая часть решения, по-видимому, была рассмотрена в этой версии, например, очистка вывода и так далее.
В моем случае это происходит во время подготовки.
Из того, что я понимаю, довольно сложно определить.Но те, кто знает Kryo, что означает исключение, поэтому я могу начать решать проблему.Когда в коде Kryo возникает это исключение, и каково его значение.
Просто для записи, в моем случае это очень странно.Исключение случается только через некоторое время.У меня есть конвейер данных, где я использую akka-stream, чтобы получить некоторые данные, передать их через kafka, а затем обработать их в крио.Все работает нормально, пока я не достигну 1 199 541 записей и немного больше обработан.Это всегда происходит вокруг того же количества обработанных записей.
Любая идея или намек на мой взгляд.
Akka-Stream
.mapAsyncUnordered(8){value =>
Future{
val kryo = kryoPool.obtain()
val outStream = new ByteArrayOutputStream()
val output = new Output(outStream, 4096)
kryo.writeClassAndObject(output, value)
output.close()
kryoPool.free(kryo)
new ProducerRecord[String, Array[Byte]]("NormalizedRawData", outStream.toByteArray)
}
}
.runWith(Producer.plainSink(producerSettings))
Spark Init
object KryoContext {
lazy val kryoPool = new Pool[Kryo](true, false, 16) {
protected def create(): Kryo = {
val cl = Thread.currentThread().getContextClassLoader()
val kryo = new Kryo()
kryo.setClassLoader(cl)
kryo.setRegistrationRequired(false)
kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
kryo
}
}
lazy val outputPool = new Pool[Output](true, false, 16) {
protected def create: Output = new Output(4096)
}
lazy val inputPool = new Pool[Input](true, false, 16) {
protected def create: Input = new Input(4096)
}
}
Использование Spark
def decodeData(rowOfBinaryList: List[Row], kryoPool: Pool[Kryo], inputPool: Pool[Input]): List[RawData] = {
val kryo = kryoPool.obtain()
val input = inputPool.obtain()
val data = rowOfBinaryList.map(r => r.getAs[Array[Byte]]("message")).map{ binaryMsg =>
input.setInputStream(new ByteArrayInputStream(binaryMsg))
val value = kryo.readClassAndObject(input).asInstanceOf[RawData]
input.close()
value
}
kryoPool.free(kryo)
data
}
Если у вас есть какие-либо подсказки, которые могут помочь, пожалуйста, просто предложите что-нибудь.