Scala: разделение данных, поступающих от kafka через DStream - PullRequest
0 голосов
/ 06 июля 2018

Я получаю данные от кафки в виде

{"email":"test@example","firstname":"Example","lastname":"User"}

Я хочу получить доступ к идентификатору и имени электронной почты и сравнить его с данными, поступающими с Кассандры, в виде:

CassandraRow{email: abc@xyz.com}

1 Ответ

0 голосов
/ 07 июля 2018

Вам необходимо выполнить соединение с Кассандрой, используя функцию joinWithCassandraTable ...

Чтобы быть более эффективным, вам может потребоваться переразметить ваш RDD, полученный от Kafka, чтобы он соответствовал разделам в таблице Кассандры. Код может выглядеть так:

val resultRdd = kafkaRDD.repartitionByCassandraReplica("ks","emails")
   .joinWithCassandraTable("ks","emails")

И после этого вы можете анализировать, совпадают ли имена и т. Д. И после объединения вы должны получать только записи, для которых в Кассандре есть электронные письма ...

...