Вам необходимо выполнить соединение с Кассандрой, используя функцию joinWithCassandraTable
...
Чтобы быть более эффективным, вам может потребоваться переразметить ваш RDD, полученный от Kafka, чтобы он соответствовал разделам в таблице Кассандры. Код может выглядеть так:
val resultRdd = kafkaRDD.repartitionByCassandraReplica("ks","emails")
.joinWithCassandraTable("ks","emails")
И после этого вы можете анализировать, совпадают ли имена и т. Д. И после объединения вы должны получать только записи, для которых в Кассандре есть электронные письма ...