CEP больше для шаблонов, которые повторяются, и здесь он не работает. Стратегии пропуска, о которых вы говорите, на самом деле называются «стратегиями пропуска после матча», подразумевая, что вы хотите сопоставить шаблон больше, чем один.
Я думаю, что лучший способ сделать это - использовать функцию процесса, например:
class AlertUserOnce extends ProcessFunction[element, element] {
lazy val turnOff: ValueState[Boolean] = getRuntimeContext
.getState(new ValueStateDescriptor[Boolean]("turn-function-off", classOf[Boolean]))
override def processElement(value: element, ctx: Context, out: Collector[element]): Unit = {
if (turnOff.value == null && element.downloads > 1000) {
alertUser(value)
turnOff.update(true)
}
out.collect(value)
}
}
...
stream.keyBy(...).process(new AlertUserOnce)