Apache Flink: как использовать стратегию пропуска? - PullRequest
0 голосов
/ 26 апреля 2018

Я хочу пропустить соответствующее событие после первого сопоставления с образцом.

Как мне использовать стратегию пропуска Флинка?

У меня есть простой сценарий, если downloads > 1000, тогда дать предупреждение пользователю.

В моей реализации, после первого совпадения, он постоянно выдает предупреждение из-за увеличения количества загрузок после 1000.

Как я могу пропустить все оповещения после первого матча?

Я прочитал документы по стратегии пропуска, но пример или реализация помогут мне.

1 Ответ

0 голосов
/ 01 августа 2018

CEP больше для шаблонов, которые повторяются, и здесь он не работает. Стратегии пропуска, о которых вы говорите, на самом деле называются «стратегиями пропуска после матча», подразумевая, что вы хотите сопоставить шаблон больше, чем один.

Я думаю, что лучший способ сделать это - использовать функцию процесса, например:

class AlertUserOnce extends ProcessFunction[element, element] {

  lazy val turnOff: ValueState[Boolean] = getRuntimeContext
     .getState(new ValueStateDescriptor[Boolean]("turn-function-off", classOf[Boolean]))


  override def processElement(value: element, ctx: Context, out: Collector[element]): Unit = {
   if (turnOff.value == null && element.downloads > 1000) {
     alertUser(value)
     turnOff.update(true)
    }
    out.collect(value)
  }
}

...

stream.keyBy(...).process(new AlertUserOnce) 
...