У меня есть следующий вариант использования.
Есть одна машина, которая отправляет потоки событий в Kafka, которые принимаются CEP engine
, где генерируются предупреждения при выполнении условий для данных потока.
FlinkKafkaConsumer011<Event> kafkaSource = new FlinkKafkaConsumer011<Event>(kafkaInputTopic, new EventDeserializationSchema(), properties);
DataStream<Event> eventStream = env.addSource(kafkaSource);
Событие POJO содержит идентификатор, имя, время, ip.
Машина отправит огромные данные в Kafka, и с машины будет 35 уникальных имен событий (например, name1, name2 ..... name35)) и я хочу обнаружить шаблоны для каждой комбинации имен событий (например, имя1 произошло с именем2, имя1 произошло с именем3 и т. д.).Я получил всего 1225 комбинаций.
Правило POJO содержит e1Name и e2Name.
List<Rule> ruleList -> It contains 1225 rules.
for (Rule rule : ruleList) {
Pattern<Event, ?> warningPattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
if(value.getName().equals(rule.getE1Name())) {
return true;
}
return false;
}
}).followedBy("next").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
if(value.getName().equals(rule.getE2Name())) {
return true;
}
return false;
}
}).within(Time.seconds(30));
PatternStream patternStream = CEP.pattern(eventStream, warningPattern);
}
Это правильный способ выполнения нескольких шаблонов для данных одного потока или существует какой-либо оптимизированный способ для достижения этого.При вышеуказанном подходе мы получаем PartitionNotFoundException
и UnknownTaskExecutorException
и проблемы с памятью.