Получил повторяющиеся значения в redis, даже если if ... else для фильтрации в SparkStreaming - PullRequest
0 голосов
/ 28 января 2019

Когда я использовал redis в процессе потоковой обработки искры, блок if ... else в foreachRDD, кажется, не работает должным образом. Не знаю почему. Имеет ли он какое-то отношение к Spark RDD?Надеюсь на помощь!

Часть кодов в искровом посту ниже. Для того, чтобы рассчитать скорость поезда непрерывно.Я использовал Redis, чтобы сохранить последние две партии координат и времени.Поскольку некоторые хронологические партии источника данных могут иметь одинаковую временную метку, поэтому я добавляю блок if ... else, чтобы избежать сохранения двух одинаковых временных меток.И результат в Redis все еще получил то же самое «Время», а другое значение, такое как Altitude1, равняется Altitude2.

val message = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Constants.inputKafkaTopics, GetKafka.getKafkaConfig())
    )

    message.foreachRDD{rdd=>if(!rdd.isEmpty()) {
    val signal = rdd.map { r =>
      var alarmSignal: AlarmSignal = null
      try {
        val signal: JSONObject = JSON.parseObject(r.value())
        alarmSignal = AlarmSignal.parseFromJson(signal)
      } catch {
        case e: Exception => {
          e.printStackTrace()
        }
      }
      alarmSignal
    }.cache()
    val result = signal.map{r=>    
    if(null != r.getLatitude && null != r.getLongitude && null != r.getAltitude){
            val coordinate : String = "coordinateByCode:"+r.getCode
            if(!jedis.exists(coordinate)){
              jedis.hset(coordinate,"Altitude1",r.getAltitude)
              jedis.hset(coordinate,"Longitude1",r.getLongitude)
              jedis.hset(coordinate,"Latitude1",r.getLatitude)
              jedis.hset(coordinate,"Time1",r.getTime)
              jedis.hset(coordinate,"Altitude2",r.getAltitude)
              jedis.hset(coordinate,"Longitude2",r.getLongitude)
              jedis.hset(coordinate,"Latitude2",r.getLatitude)
              jedis.hset(coordinate,"Time2",r.getTime)
            }else if(jedis.exists(coordinate)){
              if(!jedis.hget(coordinate,"Time2").equals(r.getTime)){
                jedis.hset(coordinate,"Altitude1",jedis.hget(coordinate,"Altitude2"))
                jedis.hset(coordinate,"Longitude1",jedis.hget(coordinate,"Longitude2"))
                jedis.hset(coordinate,"Latitude1",jedis.hget(coordinate,"Latitude2"))
                jedis.hset(coordinate,"Time1",jedis.hget(coordinate,"Time2"))
                jedis.hset(coordinate,"Altitude2",r.getAltitude)
                jedis.hset(coordinate,"Longitude2",r.getLongitude)
                jedis.hset(coordinate,"Latitude2",r.getLatitude)
                jedis.hset(coordinate,"Time2",r.getTime)
              }
            }
        }
}

Ну, тот же код сохраняет правильные значения в Redis (без повторяющихся значений) в тестовом коде Java.

...