Flink CEP PojoSerializer неверное разрешение полиморфизма - PullRequest
0 голосов
/ 19 января 2019

В настоящее время у меня действительно странное поведение при печати результатов шаблона CEP.

Модель данных выглядит следующим образом:

  • Событие : (тип: String, отметка времени: Long)
  • VehicleRelated extends Событие: (vehicleId: Integer)
  • Позиция расширяет транспортное средство: (pos: целое число, направление: целое число)
  • Распознать extends VehicleRelated: (pos: целое число, id: целое число, направление: целое число)

Часть CEP выглядит следующим образом:

val pattern = Pattern
  .begin[VehicleRelated]("before")
  .subtype(classOf[Position])
  .next("recognize")
  .subtype(classOf[Recognize])
  .next("after")
  .subtype(classOf[Position])
  .within(Time.seconds(5))

val patternStream = CEP.pattern(actionEvents, pattern)
val recognitions = patternStream
  .select(pattern => {
    val s = pattern("recognize").head.asInstanceOf[Recognize]
    LOG.debug(s.toString)
    s
  })

recognitions.print("RECO")

Вывод журналов следующий:

14:45:27,286 DEBUG stoff.schnaps.RecognizingJob$ - Recognize(VehicleId: 2, Id: 601, Pos: 1601, Direction: 35, Add: Map())
RECO:8> Recognize(VehicleId: null, Id: 601, Pos: 1601, Direction: 35, Add: Map())

Теперь главный вопрос: почему атрибут vehicleId имеет значение NULL после того, как я возвращаю приведенный объект? Есть предложения?

Обновление Я провел некоторые исследования и обнаружил, что проблема в PojoSerializer. Функция копирования вызывается, и в строке 151 this.numFields неправильно. В подсчет входит только количество атрибутов самого класса Recognize, но без унаследованных классов, в данном случае Event и VehicleRelated. Тип атрибутов и метка времени тоже нулевые ..

1 Ответ

0 голосов
/ 20 января 2019

Проблема заключалась в том, что внутренний сериализатор POJO flink не смог правильно разрешить полиморфизм.

Поэтому я устанавливаю сериализатор Kyro по умолчанию с помощью:

val config = env.getConfig
config.enableForceKryo()
...