Как выполнить поиск в потоковой структурированной искре, используя Redis в качестве таблицы поиска и kafka в качестве источника ввода - PullRequest
0 голосов
/ 10 апреля 2019

Мое требование - прочитать объекты json из kafka и для каждой записи выполнить поиск в кэше redis и сохранить обогащенные данные в mongo db. Я могу читать данные из Кафки как структурированный поток, а также могу сохранить данные в Монго БД, но Я не могу выполнить поиск Redis в структурированном потоке. Версия Spark - 2.4.1 с использованием разъема spark-redis - com.redislabs.provider.redis

На данный момент, прежде чем приступить к поиску, я помещаю полный кэш redis в набор данных, сохраняю его и использую объединение для поиска. этот подход работает нормально, но мой кэш Redis является динамическим, который может измениться в любой момент.

--- существующий код для загрузки кэша Redis в статический набор данных

val redisServer = "127.0.0.1"

val redisPortNum = 6379

val redisConfig = new    RedisConfig(RedisEndpoint(redisServer,redisPortNum))

val keysRDD = spark.sparkContext.fromRedisKeyPattern(DeviceID, 5)(redisConfig)

val keyValueRDD = keysRDD.getKV

val redisCache = keyValueRDD.toDF

образец входа: (извлеченные поля из JSON)

deviceid, скорость

ahjhfhru12,100

кэш redis (динамический, может меняться в любое время)

DeviceID, имя_устройство, SpeedLimit

ahjhfhru12, тест, 50

Вывод

(будет сохранен в dong монго):

DeviceID, скорость, имя_устройство, SpeedLimit

ahjhfhru12,50, тест, 100

для каждой записи я хочу выполнить поиск в redis, чтобы получить обновленную информацию и обогатить входные данные данными кеша, и я хочу сделать только эту искровую структурированную потоковую передачу.

Может кто-нибудь помочь мне, как сделать динамический поиск в структурированном потоке?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...