Как мне написать отдельное приложение в Spark, чтобы найти 20 большинства упоминаний в текстовом файле, заполненном извлеченными твитами - PullRequest
0 голосов
/ 04 апреля 2019

Я создаю отдельное приложение в spark, где мне нужно прочитать текстовый файл, заполненный твитами. Каждое упоминание начинается с символа «@». Цель состоит в том, чтобы просмотреть этот файл и найти наиболее 20 упоминаний. Пунктуация должна быть удалена из всех упоминаний, и если твит содержит одно и то же упоминание более одного раза, он должен учитываться только один раз. В одном твите может быть несколько уникальных упоминаний. В файле много твитов.

Я новичок в scala и apache-spark. Я думал об использовании функции фильтра и размещении результатов в списке. Затем преобразовать список в набор, где элементы являются уникальными. Но синтаксис, регулярные выражения и чтение файла - проблема, с которой я сталкиваюсь.

def main(args: Array[String]){
   val locationTweetFile = args(0)
   val spark = SparkSession.builder.appName("does this matter?").getOrCreate()

Файл чириканья огромен, эта команда ниже, безопасна?

val tweetsFile = spark.read.textFile(locationTweetFile).cache()
val mentionsExp = """([@])+""".r

}

Если бы твиттер сказал «Привет @Honda, я @ клиент, я люблю @honda. Я любимый @CUSTOMER». Тогда на выходе должно быть что-то вроде ((honda, 1), (customer, 1))

Поскольку существует несколько твитов, другой твит может сказать: "@HoNdA Я такой же @cuSTomER @STACKEXCHANGE." Тогда конечный результат будет что-то вроде ((Honda, 2), (заказчик, 2), (stackexchange, 1))

1 Ответ

1 голос
/ 05 апреля 2019

Пойдем пошагово.

1) appName("does this matter?") в вашем случае не имеет значения

2) spark.read.textFile(filename) безопасно из-за своей лени, файл не будетбыть загруженным в вашу память

Теперь о реализации:

Spark - это преобразование данных, поэтому вам нужно подумать, как преобразовать необработанные твиты в список уникальных упоминаний в каждом твите.Затем вы преобразуете список упоминаний в Map[Mention, Int], где Int - это общее количество этих упоминаний в СДР.

Преобразование обычно выполняется с помощью метода map(f: A => B), где f - отображение функции A значение до B.

def tweetToMentions(tweet: String): Seq[String] =
  tweet.split(" ").collect {
    case s if s.startsWith("@") => s.replaceAll("[,.;!?]", "").toLowerCase
  }.distinct.Seq

val mentions = tweetToMentions("Hey @Honda, I am @customer I love @honda. I am favorite @CUSTOMER.")
// mentions: Seq("@honda", "@customer")

Следующим шагом является применение этой функции к каждому элементу в нашем СДР:

val mentions = tweetsFile.flatMap(tweetToMentions)

Обратите внимание, что мы используем flatMap вместоmap, поскольку tweetToMentions возвращает Seq[String], и мы хотим, чтобы наш СДР содержал только упоминания, flatMap сгладит результат.

Для подсчета вхождений каждого упоминания в СДР нам нужно применить магию:

Сначала мы map упомянем наши пары (Mention, 1)

mentions.map(mention => (mention, 1))

Затем мы используем reduceByKey, который будет подсчитывать, сколько раз каждое упоминание упоминаетсяпроисходит в нашем RDD.Наконец, мы упорядочиваем упоминания по их количеству и полученным результатам.

val result = mentions
  .map(mention => (mention, 1))
  .reduceByKey((a, b) => a + b)
  .takeOrdered(20)(Ordering[Int].reverse.on(_.2))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...