Спарк рассчитать процент появления слов - PullRequest
0 голосов
/ 15 мая 2018

У меня есть такой PairRDD (word, wordCount). Теперь мне нужно рассчитать для каждого слова процент появлений от общего количества слов, получая итоговую PairRDD, подобную этой (word, (wordCount, процент)).

Я пробовал с:

val pairs = .... .cache() // (word, wordCount)
val wordsCount = pairs.map(_._2).reduce(_+_)
pairs.map{
        case (word, count) => (word, (count, BigDecimal(count/wordsCount.toDouble * 100).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble))
      }

Но это не кажется очень эффективным (я новичок). Есть ли лучший способ сделать это?

1 Ответ

0 голосов
/ 16 мая 2018

При расчете wordsCount, map(_._2).reduce(_ + _) более идиоматически выражается методом aggregate:

val pairs = .... .cache() // (word, wordCount)
val wordsCount = pairs.aggregate(0)((sum, pair) => sum + pair._2, _ + _)

Первый аргумент - это начальное общее количество, которое должно быть равно нулю. Следующие два аргумента находятся в отдельном списке аргументов и выполняются для каждого члена вашей пары RDD . Первый из этого второго набора аргументов ((sum, pair) => sum + pair._2, называемый оператор последовательности ) добавляет каждое значение в разделе к текущему значению суммы для этого раздела; в то время как второй (_ + _, называемый оператором ) объединяет счетчики из разных разделов. Основное преимущество заключается в том, что эта операция выполняется параллельно для каждого раздела между данными и гарантирует не дорогостоящее перераспределение (a.k.a. перетасовка ) этих данных. (Перераспределение происходит, когда данные должны передаваться между узлами в кластере, и очень медленно из-за сетевого взаимодействия.)

Хотя aggregate не влияет на разбиение данных, следует помнить, что, если ваши данные разбиты на разделы, операция map удалит схему разбиения - из-за вероятности того, что клавиши в паре преобразование может изменить СДР . Это может привести к последующему перераспределению, если вы выполните какие-либо дальнейшие преобразования в результате преобразования map. Тем не менее, операция map, за которой следует reduce, не должна приводить к перераспределению, но может быть несколько менее эффективной из-за дополнительного шага (но не слишком).

Если вы обеспокоены тем, что общее количество слов может превысить тип Int, вы можете использовать BigInt вместо:

val wordsCount = pairs.aggregate(BigInt(0))((sum, pair) => sum + pair._2, _ + _)

Что касается создания пары RDD с количеством слов и процентами в качестве значения, вы должны использовать mapValues вместо map. И снова mapValues сохранит любую существующую схему разбиения (поскольку она гарантирует, что ключи не будут изменены при преобразовании), а map удалит ее. Кроме того, mapValues проще, так как вам не нужно обрабатывать key значения:

pairs.mapValues(c => (c, c * 100.0 / wordsCount))

Это должно обеспечить достаточную точность для ваших целей. Я бы беспокоился только о округлении и использовании BigDecimal при получении и выводе значений. Однако, если вам это действительно нужно, ваш код будет выглядеть так:

pairs.mapValues{c =>
  (c, BigDecimal(c * 100.0 / wordsCount).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble))
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...