Когда я использовал redis в процессе потоковой обработки искры, блок if ... else в foreachRDD, кажется, не работает должным образом. Не знаю почему. Имеет ли он какое-то отношение к Spark RDD?Надеюсь на помощь!
Часть кодов в искровом посту ниже. Для того, чтобы рассчитать скорость поезда непрерывно.Я использовал Redis, чтобы сохранить последние две партии координат и времени.Поскольку некоторые хронологические партии источника данных могут иметь одинаковую временную метку, поэтому я добавляю блок if ... else, чтобы избежать сохранения двух одинаковых временных меток.И результат в Redis все еще получил то же самое «Время», а другое значение, такое как Altitude1, равняется Altitude2.
val message = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Constants.inputKafkaTopics, GetKafka.getKafkaConfig())
)
message.foreachRDD{rdd=>if(!rdd.isEmpty()) {
val signal = rdd.map { r =>
var alarmSignal: AlarmSignal = null
try {
val signal: JSONObject = JSON.parseObject(r.value())
alarmSignal = AlarmSignal.parseFromJson(signal)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
alarmSignal
}.cache()
val result = signal.map{r=>
if(null != r.getLatitude && null != r.getLongitude && null != r.getAltitude){
val coordinate : String = "coordinateByCode:"+r.getCode
if(!jedis.exists(coordinate)){
jedis.hset(coordinate,"Altitude1",r.getAltitude)
jedis.hset(coordinate,"Longitude1",r.getLongitude)
jedis.hset(coordinate,"Latitude1",r.getLatitude)
jedis.hset(coordinate,"Time1",r.getTime)
jedis.hset(coordinate,"Altitude2",r.getAltitude)
jedis.hset(coordinate,"Longitude2",r.getLongitude)
jedis.hset(coordinate,"Latitude2",r.getLatitude)
jedis.hset(coordinate,"Time2",r.getTime)
}else if(jedis.exists(coordinate)){
if(!jedis.hget(coordinate,"Time2").equals(r.getTime)){
jedis.hset(coordinate,"Altitude1",jedis.hget(coordinate,"Altitude2"))
jedis.hset(coordinate,"Longitude1",jedis.hget(coordinate,"Longitude2"))
jedis.hset(coordinate,"Latitude1",jedis.hget(coordinate,"Latitude2"))
jedis.hset(coordinate,"Time1",jedis.hget(coordinate,"Time2"))
jedis.hset(coordinate,"Altitude2",r.getAltitude)
jedis.hset(coordinate,"Longitude2",r.getLongitude)
jedis.hset(coordinate,"Latitude2",r.getLatitude)
jedis.hset(coordinate,"Time2",r.getTime)
}
}
}
}
Ну, тот же код сохраняет правильные значения в Redis (без повторяющихся значений) в тестовом коде Java.