Я хочу распространить rdd, используя карту списка.
входная выборка
Log("key1", "key2", "key3", Map(tk1 -> tv1, tk2 -> tv2, tk3 -> tv3))
И выходная выборка, которую я хочу:
RDD[(String, String, String, String, String)]
("key1", "key2", "key3", "tk1", "tv1")
("key1", "key2", "key3", "tk2", "tv2")
("key1", "key2", "key3", "tk3", "tv3")
Наконец, я хочувыполнить операцию уменьшения, как показано ниже.Но это не работает.
val mapCnt = logs.map(log => {
log.textMap.foreach { tmap =>
var tkey = tmap._1
var tvalue = tmap._2
}
((log.key1, log.key2, log.key3, tkey, tvalue), 1L)
}).reduceByKey(_ + _)
Вот входной объект, который я использовал.
case class Log(
val key1: String,
val key2: String,
val key3: String,
val TextMap: Map[String, String]
)
Как мне преобразовать это?
Спасибо за вашу помощь.