LinkedHashMap изменяется на HashMap и вылетает в операторах потока данных flink - PullRequest
2 голосов
/ 10 апреля 2019

С учетом этого фиктивного кода:

 1 case class MyObject(values:mutable.LinkedHashMap[String, String])

...

 2    implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
 3    implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
 4
 5    val env = StreamExecutionEnvironment.getExecutionEnvironment
 6
 7    env
 8      .fromElements("one")
 9      .map(str =>
10      {
11        val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12        val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14        obj
15      })
16      .map(obj =>
17      {
18        val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20        obj
21      })

Приложение вылетит из строки 18, за исключением:

Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap

Проблемы заключаются в том, что при сериализации / десериализации valuesэлемент меняет свой тип объекта, или, другими словами, LinkedHashMap превращается в HashMap.

Обратите внимание, что тот же код, что и в строке 18, прекрасно работает в строке 12.

При установкеточка останова в строке 12, отладчик / IntelliJ покажет obj.values как LinkedHashMap, однако точка останова в строке 18 покажет obj.values как HashMap в отладчике.

Что происходитВот?Как я могу это исправить?В конце концов, LinkedHashMap реализует Serializable?!

1 Ответ

2 голосов
/ 10 апреля 2019

Стандартный сериализатор Kryo Chill для LinkedHashMap не сохраняет тип карты и вместо этого десериализует данные в HashMap. Чтобы избежать этого, необходимо создать сериализатор для типа LinkedHashMap:

class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
  override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
    kryo.writeObject(output, `object`.size)

    for (elem <- `object`.iterator) {
      kryo.writeClassAndObject(output, elem._1)
      kryo.writeClassAndObject(output, elem._2)
    }
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
    val result = new mutable.LinkedHashMap[K, V]()
    val size = kryo.readObject(input, classOf[Int])
    for (_ <- 1 to size) {
      val key = kryo.readClassAndObject(input).asInstanceOf[K]
      val value = kryo.readClassAndObject(input).asInstanceOf[V]
      result.put(key, value)
    }

    result
  }
}

А затем зарегистрируйте его как Kryo Serializer:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())
...