Как сделать Pattern и предупредить о следующем - PullRequest
0 голосов
/ 05 октября 2019

У меня есть этот код, который дает locationID и temp, я хочу шаблон, который выдает предупреждение всякий раз, когда temp> THRESHOLD_TEMPERATURE

Я пробовал: -

        val pattern1: Pattern[Event,_] = Pattern.begin[Event]("first")
          .subtype(Event.getClass)
          .where( (evt -> evt.getTemp()) >= TEMPERATURE_THRESHOLD)
          .within(Time.seconds(5))

        val patternStream: PatternStream[Event] = CEP.pattern(f,pattern1)
       val alerts: DataStream[String] = patternStream.flatSelect(
          (in: Map[String,String], out: Collector[String]) => {
            var first: String = in.get("first")
            if (first >= TEMPERATURE_THRESHOLD){

               out.collect("Temperature above danger zone")
           }
          }
        )

Этокод, для которого должно быть сделано предупреждение: -

case class Event(locationID: String, temp: Double)

    val TEMPERATURE_THRESHOLD: Double = 50.00

    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("bootstrap.servers", "localhost:9092")


    val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
      new JSONKeyValueDeserializationSchema(false), properties))


    var ask = src.map{
      r => r.get("value")
      }


    var data = ask.map { v => {
      val loc = v.get("locationID").asInstanceOf[String]
      val temperature = v.get("temp").asDouble()

      (loc, temperature)
    }}
//    data.print()

    var f = data.keyBy(
        v => v._2
      )

    f.print()
    see.execute()

шаблон перегружается, также flatSelect

1 Ответ

0 голосов
/ 05 октября 2019

CEP предназначен для обнаружения паттернов в последовательностях событий, которые на самом деле не соответствуют этой конкретной проблеме. Поиск событий, где temp> THRESHOLD не требует сопоставления с образцом - простой фильтр или плоская карта выполнят эту работу. Например, у вас может быть плоская карта, которая преобразует каждое событие, когда температура слишком высока, для предупреждения и игнорирует все другие события.

Что касается вашего решения на основе CEP, я не понимаю, что вы говоритеидет не так с этим. Но пара вещей выглядит неправильно. Предложение within ничего не будет делать: поскольку ваши шаблоны имеют длину только одно событие, они не имеют никакой продолжительности. И вы набираете поток по температуре, которая кажется странной.

...