Как сопоставить поток данных с потоком паттернов - PullRequest
0 голосов
/ 08 октября 2019

У меня есть 2 потока kafka один (keyedStream) для DataStream позже (keyedPatStream) для Pattern Stream. Мой вопрос, как я могу сравнить два?

var keyedStream = src.map(v => v.get("value"))
      .map {
        v =>val loc = v.get("locationID").asText()
          val temp = v.get("temp").asDouble()
          (loc, temp)
      }
      .keyBy(v => v._1)

    val patStream : DataStream[(ObjectNode)] = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("pattern", new JSONKeyValueDeserializationSchema(false), properties))

    var keyedPatStream = patStream.map( r => r.get("value"))
        .map{
          v => val locate = v.get("locationID").asText()
            val temper = v.get("temp").asDouble()
            (locate, temper)
        }
        .keyBy( v => v._1)

Пожалуйста, скажите мне, как я могу сопоставить два, я использовал это

    val pat = Pattern
      .begin[(String, Double)]("start")
      .where(_._2 > TEMPERATURE_THRESHOLD)  //1

, когда я сравнивал со статическим значением, а именно(TEMPERATURE_THRESHOLD). Но я понятия не имею, как бороться с потоком. Любая помощь будет оценена

...