У меня есть этот код, который дает locationID и temp, я хочу шаблон, который выдает предупреждение всякий раз, когда temp> THRESHOLD_TEMPERATURE
Я пробовал: -
val pattern1: Pattern[Event,_] = Pattern.begin[Event]("first")
.subtype(Event.getClass)
.where( (evt -> evt.getTemp()) >= TEMPERATURE_THRESHOLD)
.within(Time.seconds(5))
val patternStream: PatternStream[Event] = CEP.pattern(f,pattern1)
val alerts: DataStream[String] = patternStream.flatSelect(
(in: Map[String,String], out: Collector[String]) => {
var first: String = in.get("first")
if (first >= TEMPERATURE_THRESHOLD){
out.collect("Temperature above danger zone")
}
}
)
Этокод, для которого должно быть сделано предупреждение: -
case class Event(locationID: String, temp: Double)
val TEMPERATURE_THRESHOLD: Double = 50.00
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("bootstrap.servers", "localhost:9092")
val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
new JSONKeyValueDeserializationSchema(false), properties))
var ask = src.map{
r => r.get("value")
}
var data = ask.map { v => {
val loc = v.get("locationID").asInstanceOf[String]
val temperature = v.get("temp").asDouble()
(loc, temperature)
}}
// data.print()
var f = data.keyBy(
v => v._2
)
f.print()
see.execute()
шаблон перегружается, также flatSelect