Шаблон Apache Flink CEP для нескольких типов событий - PullRequest
0 голосов
/ 16 января 2019

В настоящее время я работаю над семестровым проектом, где я должен узнать серию из трех событий. Нравится P -> R -> P

У нас есть два разных типа событий, которые используются через соединитель Kafka в одной теме.

Я создал родительский класс с именем Event, из которого происходят два других типа.

Соединитель Kafka десериализует JSON с EventSchema до родительского класса Event.

val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)

Шаблон выглядит так:

val pattern = Pattern
  .begin[Event]("before")
  .subtype(classOf[Position])
  .next("recognized")
  .subtype(classOf[Recognized])
  .next("after")
  .subtype(classOf[Position])

В настоящее время проблема заключается в том, что если я отправлю три сообщения в соответствующем формате, шаблон не будет распознан.

То, что я пробовал еще .. Я изменил шаблон следующим образом:

val pattern = Pattern
  .begin[Event]("before")
  .where(e => e.getType == "position")
  .next("recognized")
  .where(e => e.getType == "recognition")
  .next("after")
  .where(e => e.getType == "position")

Этот шаблон работает, но позже я не могу привести класс Event к позиции или распознаванию.

Что мне здесь не хватает?

1 Ответ

0 голосов
/ 17 января 2019

Согласно комментариям, я думаю, что вы должны возвращать экземпляры подтипа вместо Event. Вот мой пример кода для вас:

val event = mapper.readValue(bytes, classOf[Event])
event.getType match {
  case "position" => mapper.readValue(bytes, classOf[Position])
  case "recognition" => mapper.readValue(bytes, classOf[Recognized])
  case _ =>
}

Я успешно попробовал пример из тестового примера в CEPITCase.java.

DataStream<Event> input = env.fromElements(
  new Event(1, "foo", 4.0),
  new SubEvent(2, "foo", 4.0, 1.0),
  new SubEvent(3, "foo", 4.0, 1.0),
  new SubEvent(4, "foo", 4.0, 1.0),
  new Event(5, "middle", 5.0)
);

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
.followedByAny("middle").subtype(SubEvent.class)
.followedByAny("end").subtype(SubEvent.class);
...