У меня есть 2 потока kafka один (keyedStream) для DataStream позже (keyedPatStream) для Pattern Stream. Мой вопрос, как я могу сравнить два?
var keyedStream = src.map(v => v.get("value"))
.map {
v =>val loc = v.get("locationID").asText()
val temp = v.get("temp").asDouble()
(loc, temp)
}
.keyBy(v => v._1)
val patStream : DataStream[(ObjectNode)] = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("pattern", new JSONKeyValueDeserializationSchema(false), properties))
var keyedPatStream = patStream.map( r => r.get("value"))
.map{
v => val locate = v.get("locationID").asText()
val temper = v.get("temp").asDouble()
(locate, temper)
}
.keyBy( v => v._1)
Пожалуйста, скажите мне, как я могу сопоставить два, я использовал это
val pat = Pattern
.begin[(String, Double)]("start")
.where(_._2 > TEMPERATURE_THRESHOLD) //1
, когда я сравнивал со статическим значением, а именно(TEMPERATURE_THRESHOLD). Но я понятия не имею, как бороться с потоком. Любая помощь будет оценена