У меня есть потоковое приложение EventTime, которое использует библиотеку CEP для базового трехшагового шаблона в объединенном потоке.Присоединенный поток представляет собой комбинацию живых данных, водяных знаков и оконных данных и потока исторических элементов за пределами оконного / водяного знака.
Настройка аналогична записи в блоге dataArtisans за исключениемшаблон CEP в качестве последнего шага.
Наша настройка CEP выглядит следующим образом и работала до добавления в исторический поток без меток времени.EscalatingAlertEventIterativeCondition
гарантирует, что предыдущее совпадение событий имеет больший уровень, чем следующее.
Pattern<AlertEvent, ?> pattern = Pattern.<AlertEvent>
begin("one")
.where((AlertEvent event) -> event.level > 0)
.next("two")
.where(new EscalatingAlertEventIterativeCondition("one"))
.next("three")
.where(new EscalatingAlertEventIterativeCondition("two"));
return CEP.pattern(
alertEventStream,
pattern
);
Проблема, с которой я сталкиваюсь, заключается в том, что CEP постоянно буферизует (точки останова внутри фильтра и итерационные условия теперь не достигаются) и что фильтрация / выбор никогда не происходит.Первоначально я думал, что это может быть связано с буфером CEP, но я не уверен, так как я новичок в Flink и Flink CEP.Есть ли способ избежать буфера задержки или что-то еще выглядит неправильно?
Наш график работы, где только самый верхний, живой поток данных помечается временем и помечается водяными знаками: