В настоящее время я работаю над семестровым проектом, где я должен узнать серию из трех событий. Нравится P -> R -> P
У нас есть два разных типа событий, которые используются через соединитель Kafka в одной теме.
Я создал родительский класс с именем Event, из которого происходят два других типа.
Соединитель Kafka десериализует JSON с EventSchema до родительского класса Event.
val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)
Шаблон выглядит так:
val pattern = Pattern
.begin[Event]("before")
.subtype(classOf[Position])
.next("recognized")
.subtype(classOf[Recognized])
.next("after")
.subtype(classOf[Position])
В настоящее время проблема заключается в том, что если я отправлю три сообщения в соответствующем формате, шаблон не будет распознан.
То, что я пробовал еще .. Я изменил шаблон следующим образом:
val pattern = Pattern
.begin[Event]("before")
.where(e => e.getType == "position")
.next("recognized")
.where(e => e.getType == "recognition")
.next("after")
.where(e => e.getType == "position")
Этот шаблон работает, но позже я не могу привести класс Event к позиции или распознаванию.
Что мне здесь не хватает?