Я только начинаю с Flink CEP
и пришел с Esper CEP
движком.Как вы ( или не ) знаете, в Esper
с использованием их синтаксиса (EPL
) вы можете легко создать окно batch
или slide
, группируя события в этих окнах и позволяя вамиспользовать это событие с функциями (avg, max, min, ...).
Например, с помощью следующего шаблона вы можете создать пакетные окна за 5 секунд и рассчитать среднее значение атрибута price
из всех Stock
событий, которые вы получили в указанном окне.
select avg(price) from Stock#time_batch(5 sec)
Дело в том, что я хотел бы знать, как реализовать это на Flink CEP
.Я знаю, что, возможно, цель или подход в Flink CEP
отличаются, поэтому способ реализации этого может быть не таким простым, как в Esper CEP
.
Я взглянул на docs относительно временных окон, но я не могу реализовать эти окна вместе с Flink CEP
.Итак, учитывая следующий код:
DataStream<Stock> stream = ...; // Consume events from Kafka
// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
.where(
new SimpleCondition<Stock>() {
public boolean filter(Stock event) {
return event.getPrice() >= 0;
}
}
);
PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);
/**
CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
GREATER THAN A THREESHOLD, AN ALERT IS DETECTED
return avg(allEventsInWindow.getPrice()) > 1;
*/
DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Stock, Alert>() {
@Override
public Alert select(Map<String, List<Stock>> pattern) throws Exception {
return new Alert(pattern.toString());
}
}
);
Как мне создать окно, в котором из первого полученного я начинаю вычислять среднее значение для следующих событий в течение 5 секунд.Например:
t = 0 seconds
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds (...end of batch window...)
Avg = 1.5 => Alert detected!
Среднее значение через 5 секунд составит 1,5 и вызовет предупреждение.Как я могу написать это?
Спасибо!