Обработка нескольких шаблонов в Flink CEP параллельно на одном потоке данных - PullRequest
1 голос
/ 16 мая 2019

У меня есть следующий вариант использования.

Есть одна машина, которая отправляет потоки событий в Kafka, которые принимаются CEP engine, где генерируются предупреждения при выполнении условий для данных потока.

FlinkKafkaConsumer011<Event> kafkaSource = new FlinkKafkaConsumer011<Event>(kafkaInputTopic, new EventDeserializationSchema(), properties);
DataStream<Event> eventStream = env.addSource(kafkaSource);

Событие POJO содержит идентификатор, имя, время, ip.

Машина отправит огромные данные в Kafka, и с машины будет 35 уникальных имен событий (например, name1, name2 ..... name35)) и я хочу обнаружить шаблоны для каждой комбинации имен событий (например, имя1 произошло с именем2, имя1 произошло с именем3 и т. д.).Я получил всего 1225 комбинаций.

Правило POJO содержит e1Name и e2Name.

List<Rule> ruleList -> It contains 1225 rules.

for (Rule rule : ruleList) {
    Pattern<Event, ?> warningPattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {

    @Override
        public boolean filter(Event value) throws Exception {
            if(value.getName().equals(rule.getE1Name())) {
                return true;
            }
            return false;
        }

    }).followedBy("next").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            if(value.getName().equals(rule.getE2Name())) {
                return true;
            }
            return false;
        }
    }).within(Time.seconds(30));
    PatternStream patternStream = CEP.pattern(eventStream, warningPattern);
}

Это правильный способ выполнения нескольких шаблонов для данных одного потока или существует какой-либо оптимизированный способ для достижения этого.При вышеуказанном подходе мы получаем PartitionNotFoundException и UnknownTaskExecutorException и проблемы с памятью.

1 Ответ

0 голосов
/ 20 мая 2019

IMO вам не нужны шаблоны для достижения вашей цели.Вы можете определить функцию сопоставления с состоянием для источника, который отображает имена событий как пары (последние два имени).После этого установите окно на 30 секунд и примените простой пример WordCount к источнику.

Функция отображения с сохранением состояния может выглядеть примерно так (принимая только имя события, вам нужно изменить его в соответствии с именем события ввода -extract и т. Д.):

public class TupleMap implements MapFunction<String, Tuple2<String, Integer>>{
    Tuple2<String, String> latestTuple = new Tuple2<String, String>();

    public Tuple2<String, Integer> map(String value) throws Exception {
        this.latestTuple.f0 = this.latestTuple.f1;
        this.latestTuple.f1 = value;
        return new Tuple2<String, Integer>(this.latestTuple.f0 + this.latestTuple.f1, 1);
    }
}

и результат с событиемПары имен и количество вхождений в виде кортежа можно получить следующим образом (возможно, записано в приемник кафки?):

DataStream<Tuple2<String, Integer>> source = stream.map(new TupleMap());
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.keyBy(0).timeWindow(Time.seconds(30)).sum(1);
...