Допустим, у меня есть программа Spark Scala с RDD с именем mention_rdd
, которая выглядит следующим образом:
(name, (filename, sum))
...
(Maria, (file0, 3))
(John, (file0, 1))
(Maria, (file1, 6))
(Maria, (file2, 1))
(John, (file2, 3))
...
Где у нас есть имена файлов и количество вхождений для каждого имени.
Я хочу уменьшить и найти для каждого имени имя файла с максимальным количеством вхождений. Например:
(name, (filename, max(sum))
...
(Maria, (file1, 6))
(John, (file2, 3))
...
Я пытался получить доступ к кортежу (filename,sum)
RDD самостоятельно, поэтому я могу уменьшить его на name
(из-за ошибки, в которой говорилось, что я не могу пройти из mention_rdd
, потому что (String,Int)
не является типом TraversableOnce
):
val output = mention_rdd.flatMap(file_counts => file_counts._2.map(file_counts._2._1, file_counts._2._2))
.reduceByKey((a, b) => if (a > b) a else b)
Но я получил сообщение об ошибке: карта значений не является членом (String, Int )
Можно ли это сделать в Spark? И если да, то как? Был ли мой подход ошибочным с самого начала?