Почему я не могу сделать из этого ключевой поток - PullRequest
1 голос
/ 03 октября 2019

Я пробовал разные методы, но не смог получить права. что не так в этом коде?

  def main(args: Array[String]): Unit = {
    val TEMPERATURE_THRESHOLD: Double = 50.00

    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    val src  = see.addSource(new FlinkKafkaConsumer010("broadcast",
      new JSONKeyValueDeserializationSchema(true), properties))

    src.map {
      v =>
        val key = v.get("locationID").asText
        val temperature = v.get("temp").asDouble()
        (key, temperature)
    }
        .keyBy(v => v._2)


    src.print()
    see.execute()

    case class Event(locationID: String, temp: Double)

Мне нужно уведомление, когда мигание считывает значение выше порогового значения

[{"locationID": "ASK","temp": 35},
{"locationID": "BC","temp": 45},
{"locationID":"CHD","temp": 55},
{"locationID": "RAJ","temp": 65},
{"locationID": "EGY","temp": 55}]

Ошибка, которую я получаю:

Ошибка выполнения задания

...