Пойдем пошагово.
1) appName("does this matter?")
в вашем случае не имеет значения
2) spark.read.textFile(filename)
безопасно из-за своей лени, файл не будетбыть загруженным в вашу память
Теперь о реализации:
Spark - это преобразование данных, поэтому вам нужно подумать, как преобразовать необработанные твиты в список уникальных упоминаний в каждом твите.Затем вы преобразуете список упоминаний в Map[Mention, Int]
, где Int
- это общее количество этих упоминаний в СДР.
Преобразование обычно выполняется с помощью метода map(f: A => B)
, где f
- отображение функции A
значение до B
.
def tweetToMentions(tweet: String): Seq[String] =
tweet.split(" ").collect {
case s if s.startsWith("@") => s.replaceAll("[,.;!?]", "").toLowerCase
}.distinct.Seq
val mentions = tweetToMentions("Hey @Honda, I am @customer I love @honda. I am favorite @CUSTOMER.")
// mentions: Seq("@honda", "@customer")
Следующим шагом является применение этой функции к каждому элементу в нашем СДР:
val mentions = tweetsFile.flatMap(tweetToMentions)
Обратите внимание, что мы используем flatMap
вместоmap
, поскольку tweetToMentions
возвращает Seq[String]
, и мы хотим, чтобы наш СДР содержал только упоминания, flatMap
сгладит результат.
Для подсчета вхождений каждого упоминания в СДР нам нужно применить магию:
Сначала мы map
упомянем наши пары (Mention, 1)
mentions.map(mention => (mention, 1))
Затем мы используем reduceByKey
, который будет подсчитывать, сколько раз каждое упоминание упоминаетсяпроисходит в нашем RDD.Наконец, мы упорядочиваем упоминания по их количеству и полученным результатам.
val result = mentions
.map(mention => (mention, 1))
.reduceByKey((a, b) => a + b)
.takeOrdered(20)(Ordering[Int].reverse.on(_.2))