Flink - поведение разOrMore - PullRequest
       89

Flink - поведение разOrMore

0 голосов
/ 28 февраля 2019

Я хочу найти шаблон событий, которые следуют

Внутренний шаблон:

  1. Имеют то же значение для клавиши "sensorArea".
  2. Имеют другое значениедля ключа "customerId".
  3. Находятся в пределах 5 секунд друг от друга.

И для этого шаблона необходимо

  1. Извещать "только", если предыдущийслучается 3 или более раз.

Я написал что-то, но я точно знаю, что это не завершено.

Два вопроса

  1. Мне нужно получить доступ к полям предыдущих событий, когда я нахожусь в "следующем" шаблоне, как я могу это сделатьбез использования команды ctx, потому что она тяжелая ..

  2. Мой код дает странный результат - это мой ввод

enter image description here

, и мой выходной сигнал равен

3> {first=[Customer[timestamp=50,customerId=111,toAdd=2,sensorData=33]], second=[Customer[timestamp=100,customerId=222,toAdd=2,sensorData=33], Customer[timestamp=600,customerId=333,toAdd=2,sensorData=33]]}

, хотя желаемым выходным сигналом должны быть все первые шесть событий (пользователи 111/222 и датчик 33, а затем 44, а затем 55

Pattern<Customer, ?> sameUserDifferentSensor = Pattern.<Customer>begin("first", skipStrategy)
            .followedBy("second").where(new IterativeCondition<Customer>() {
                @Override
                public boolean filter(Customer currCustomerEvent, Context<Customer> ctx) throws Exception {
                    List<Customer> firstPatternEvents = Lists.newArrayList(ctx.getEventsForPattern("first"));
                    int i = firstPatternEvents.size();
                    int currSensorData = currCustomerEvent.getSensorData();
                    int prevSensorData = firstPatternEvents.get(i-1).getSensorData();
                    int currCustomerId = currCustomerEvent.getCustomerId();
                    int prevCustomerId = firstPatternEvents.get(i-1).getCustomerId();
                    return currSensorData==prevSensorData && currCustomerId!=prevCustomerId;
                }
            })
            .within(Time.seconds(5))
            .timesOrMore(3);



    PatternStream<Customer> sameUserDifferentSensorPatternStream = CEP.pattern(customerStream, sameUserDifferentSensor);
    DataStream<String> alerts1 = sameUserDifferentSensorPatternStream.select((PatternSelectFunction<Customer, String>) Object::toString);

1 Ответ

0 голосов
/ 02 марта 2019

Вам будет легче, если вы сначала наберете поток с помощью датчика.Они будут соответствовать шаблону в потоках, где все события предназначены для одной сенсорной области, что облегчит выражение шаблона и сделает сравнение более эффективным.

Вы не можете избежать использования итеративного условия и ctx, но это должно быть дешевле после ввода ключа в поток.

Кроме того, ваш пример кода не соответствует текстовому описанию.В тексте написано «в течение 5 секунд» и «3 или более раз», в то время как код имеет within(Time.seconds(2)) и timesOrMore(2).

...