Как создать пакетное или скользящее окно, используя Flink CEP? - PullRequest
0 голосов
/ 05 октября 2018

Я только начинаю с Flink CEP и пришел с Esper CEP движком.Как вы ( или не ) знаете, в Esper с использованием их синтаксиса (EPL) вы можете легко создать окно batch или slide, группируя события в этих окнах и позволяя вамиспользовать это событие с функциями (avg, max, min, ...).

Например, с помощью следующего шаблона вы можете создать пакетные окна за 5 секунд и рассчитать среднее значение атрибута price из всех Stock событий, которые вы получили в указанном окне.

select avg(price) from Stock#time_batch(5 sec)

Дело в том, что я хотел бы знать, как реализовать это на Flink CEP.Я знаю, что, возможно, цель или подход в Flink CEP отличаются, поэтому способ реализации этого может быть не таким простым, как в Esper CEP.

Я взглянул на docs относительно временных окон, но я не могу реализовать эти окна вместе с Flink CEP.Итак, учитывая следующий код:

DataStream<Stock> stream = ...; // Consume events from Kafka

// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
            .where(
                    new SimpleCondition<Stock>() {
                        public boolean filter(Stock event) {
                            return event.getPrice() >= 0;
                        }
                    }
            );

PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);

/**
  CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
  I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
  GREATER THAN A THREESHOLD, AN ALERT IS DETECTED

  return avg(allEventsInWindow.getPrice()) > 1;
*/  

DataStream<Alert> result = patternStream.select(
            new PatternSelectFunction<Stock, Alert>() {
                @Override
                public Alert select(Map<String, List<Stock>> pattern) throws Exception {
                    return new Alert(pattern.toString());
                }
            }
    );

Как мне создать окно, в котором из первого полученного я начинаю вычислять среднее значение для следующих событий в течение 5 секунд.Например:

t = 0 seconds 
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds     (...end of batch window...)
Avg = 1.5 => Alert detected!

Среднее значение через 5 секунд составит 1,5 и вызовет предупреждение.Как я могу написать это?

Спасибо!

1 Ответ

0 голосов
/ 06 октября 2018

С библиотекой Флинка CEP это поведение невозможно выразить.Я бы скорее рекомендовал использовать Flink's DataStream или Table API для расчета средних значений.Исходя из этого, вы можете снова использовать CEP для генерации других событий.

final DataStream<Stock> input = env
    .fromElements(
            new Stock(1L, 1.0),
            new Stock(2L, 2.0),
            new Stock(3L, 1.0),
            new Stock(4L, 2.0))
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
        @Override
        public long extractTimestamp(Stock element) {
            return element.getTimestamp();
        }
    });

final DataStream<Double> windowAggregation = input
    .timeWindowAll(Time.milliseconds(2))
    .aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
        @Override
        public Tuple2<Integer, Double> createAccumulator() {
            return Tuple2.of(0, 0.0);
        }

        @Override
        public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
            return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
        }

        @Override
        public Double getResult(Tuple2<Integer, Double> accumulator) {
            return accumulator.f1 / accumulator.f0;
        }

        @Override
        public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
            return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
        }
    });

final DataStream<Double> result = windowAggregation.filter((FilterFunction<Double>) value -> value > THRESHOLD);
...