Обход Flink CEP-буфера в потоковой среде EventTime - PullRequest
0 голосов
/ 11 декабря 2018

У меня есть потоковое приложение EventTime, которое использует библиотеку CEP для базового трехшагового шаблона в объединенном потоке.Присоединенный поток представляет собой комбинацию живых данных, водяных знаков и оконных данных и потока исторических элементов за пределами оконного / водяного знака.

Настройка аналогична записи в блоге dataArtisans за исключениемшаблон CEP в качестве последнего шага.

Наша настройка CEP выглядит следующим образом и работала до добавления в исторический поток без меток времени.EscalatingAlertEventIterativeCondition гарантирует, что предыдущее совпадение событий имеет больший уровень, чем следующее.

Pattern<AlertEvent, ?> pattern = Pattern.<AlertEvent>
        begin("one")
        .where((AlertEvent event) -> event.level > 0)
        .next("two")
        .where(new EscalatingAlertEventIterativeCondition("one"))
        .next("three")
        .where(new EscalatingAlertEventIterativeCondition("two"));

return CEP.pattern(
        alertEventStream,
        pattern
);

Проблема, с которой я сталкиваюсь, заключается в том, что CEP постоянно буферизует (точки останова внутри фильтра и итерационные условия теперь не достигаются) и что фильтрация / выбор никогда не происходит.Первоначально я думал, что это может быть связано с буфером CEP, но я не уверен, так как я новичок в Flink и Flink CEP.Есть ли способ избежать буфера задержки или что-то еще выглядит неправильно?

Наш график работы, где только самый верхний, живой поток данных помечается временем и помечается водяными знаками: enter image description here

...