Мое требование - прочитать объекты json из kafka и для каждой записи выполнить поиск в кэше redis и сохранить обогащенные данные в mongo db.
Я могу читать данные из Кафки как структурированный поток, а также могу сохранить данные в Монго БД, но
Я не могу выполнить поиск Redis в структурированном потоке.
Версия Spark - 2.4.1
с использованием разъема spark-redis - com.redislabs.provider.redis
На данный момент, прежде чем приступить к поиску, я помещаю полный кэш redis в набор данных, сохраняю его и использую объединение для поиска. этот подход работает нормально, но мой кэш Redis является динамическим, который может измениться в любой момент.
--- существующий код для загрузки кэша Redis в статический набор данных
val redisServer = "127.0.0.1"
val redisPortNum = 6379
val redisConfig = new RedisConfig(RedisEndpoint(redisServer,redisPortNum))
val keysRDD = spark.sparkContext.fromRedisKeyPattern(DeviceID, 5)(redisConfig)
val keyValueRDD = keysRDD.getKV
val redisCache = keyValueRDD.toDF
образец входа: (извлеченные поля из JSON)
deviceid, скорость
ahjhfhru12,100
кэш redis (динамический, может меняться в любое время)
DeviceID, имя_устройство, SpeedLimit
ahjhfhru12, тест, 50
Вывод
(будет сохранен в dong монго):
DeviceID, скорость, имя_устройство, SpeedLimit
ahjhfhru12,50, тест, 100
для каждой записи я хочу выполнить поиск в redis, чтобы получить обновленную информацию и обогатить входные данные данными кеша, и я хочу сделать только эту искровую структурированную потоковую передачу.
Может кто-нибудь помочь мне, как сделать динамический поиск в структурированном потоке?