Улучшение производительности для функции, которая считает общие слова - PullRequest
0 голосов
/ 02 апреля 2019

У меня есть эта программа, которая использует Apache Spark для вычисления частоты слов.

Я создаю RDD с парами ключ / значение (слово = ключ, частота = значение).Набор данных распределяется по рабочим узлам.Функция частая запись выполняется регулярно.Он выбирает строки из файлов.которые затем преобразуются в пары ключ-значение и соединяются с wordDataset-RDD.Подсчитываются слова с частотой> 50.

Мне сказали, что этот подход не эффективен.Может кто-нибудь сказать мне, почему и как я мог улучшить это?

val sc = new SparkContext(...)
var wordDataset:RDD[(String, Int)] = sc.sequenceFile[String, Int](“…”).persist()

def frequentWordCount(fileName:String):Long = {
   val words = sc.sequenceFile[String](fileName)
   val joined = wordDataset.join(words.map(x=>(x,1)))
   joined.filter(x=>x._1._2>50).count
}

Ответы [ 2 ]

0 голосов
/ 02 апреля 2019

Если число общих слов достаточно мало, чтобы поместиться в оперативную память Set, то, что предлагает другой ответ (за исключением того, что вам нужно map(_._1) там после filter.

В противном случае вы можете улучшить две вещи: (1) фильтр перед присоединением, вы хотите выбросить дополнительные данные, как только сможете, вместо ненужного сканирования их несколько раз, и (2) Как правило, вы всегда хотите объединить больший набор данных с меньшим, а не наоборот.

 sc.sequenceFile[String](fileName)
   .keyBy(identity)
   .join(wordDataset.filter(_._2 > 50))
   .count
0 голосов
/ 02 апреля 2019

Приблизительно, сколько у вас будет частых слов?Для многих разумных задач я думаю, что он должен быть неожиданно маленьким - достаточно маленьким, чтобы поместиться в память каждой отдельной машины.IIRC, слова, как правило, подчиняются распределению по степенному закону, поэтому не должно быть так много «обычных» слов.В этом случае вещание набора частых слов может быть намного быстрее, чем присоединение:

val sc = new SparkContext(...)
var commonWords: BroadCast[Set[String]] = sc.broadcast(sc.sequenceFile[String, Int](“…”).filter(_._2 > 50).collect().toSet)

def frequentWordCount(fileName:String):Long = {
   val words = sc.sequenceFile[String](fileName)
   words.filter(commonWords.value.contains).count
}

Если вы звоните frequentWordCount несколько раз, вероятно, также лучше сделать это всего за одну операцию RDD, гдеваши слова ассоциируются с именем файла, а затем группируются и подсчитываются или что-то в этом роде ... особенности зависят от того, как оно используется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...