После нескольких дней изучения того, почему мое приложение Flink не работает должным образом Я пришел к выводу, что проблема заключается в MinMaxPriorityQueue
, который я использую.
Кажется, эта структура не сериализуема. Я пробовал несколько способов его сериализации:
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], classOf[ProtobufSerializer]);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
все они без удачи.
Однако я нашел это: Сериализация таблицы неизменяемых Guava
Есть ли эквивалент MinMaxPriorityQueue или способ его сериализации?
Обновление
Я перевёл Томаша в скалу:
class MinMaxPriorityQueueSerializer extends Serializer[MinMaxPriorityQueue[Object]] {
private[this] val log = LoggerFactory.getLogger(this.getClass)
setImmutable(false)
setAcceptsNull(false)
val OPTIMIZE_POSITIVE = true
override def read(kryo: Kryo, input: Input, aClass: Class[MinMaxPriorityQueue[Object]]): MinMaxPriorityQueue[Object] = {
log.error("Kryo READ")
val comparator: Ordering[Object] = kryo.readClassAndObject(input).asInstanceOf[Ordering[Object]]
val size = input.readInt(OPTIMIZE_POSITIVE)
val queue: MinMaxPriorityQueue[Object] = MinMaxPriorityQueue.orderedBy(comparator)
.expectedSize(size)
.create()
(0 to size).foreach(_ => queue.offer(kryo.readClassAndObject(input)))
queue
}
override def write(kryo: Kryo, output: Output, queue: MinMaxPriorityQueue[Object]): Unit = {
log.error("Kryo WRITE")
kryo.writeClassAndObject(output, queue.comparator)
val declaredSize = queue.size
output.writeInt(declaredSize, OPTIMIZE_POSITIVE)
val actualSize = queue.toArray.foldLeft(0) {
case (z, q) =>
kryo.writeClassAndObject(output, q)
z + 1
}
Preconditions.checkState(
declaredSize == actualSize,
"Declared size (%s) different than actual size (%s)", declaredSize, actualSize)
}
}
И установите Kryo в Flink, чтобы использовать этот сериализатор:
env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])
Однако кажется, что его никогда не вызывают, так как я нигде не вижу в логах выводов log.error("Kryo READ")
и log.error("Kryo WRITE")
И преобразование по-прежнему возвращает пустое MinMaxPriorityQueue, даже я его обновляю.
Обновление 2
Я реализовал SerializerTester, но я получаю bufferUnderflow:
object Main {
def main(args: Array[String]) {
val tester = new MinMaxPriorityQueueSerializerTester()
val inQueue: MinMaxPriorityQueue[java.lang.Double] = MinMaxPriorityQueue.create()
inQueue.add(1.0)
val outputStream = new ByteArrayOutputStream()
tester.serialize(outputStream, inQueue)
val inputStream = new ByteArrayInputStream(outputStream.toByteArray())
val outQueue: MinMaxPriorityQueue[java.lang.Double] = tester.deserialize(inputStream);
System.out.println(inQueue);
System.out.println(outQueue);
}
class MinMaxPriorityQueueSerializerTester {
val kryo = new Kryo
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy)
registerMinMaxSerializer();
// allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering
def registerMinMaxSerializer() {
kryo.addDefaultSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], new MinMaxPriorityQueueSerializer());
}
def serialize(out: OutputStream, queue: MinMaxPriorityQueue[java.lang.Double]) {
// try (Output output = new Output(out)) {
val output = new Output(out)
kryo.writeClassAndObject(output, queue)
// kryo.writeObject(output, queue)
//}
output.flush
}
def deserialize(in: InputStream): MinMaxPriorityQueue[java.lang.Double] = {
//try (Input input = new Input(in)) {
val input = new Input(in)
//kryo.readObject(input, classOf[MinMaxPriorityQueue[java.lang.Double]])
kryo.readClassAndObject(input).asInstanceOf[MinMaxPriorityQueue[java.lang.Double]]
//p}
}
}