При расчете wordsCount
, map(_._2).reduce(_ + _)
более идиоматически выражается методом aggregate
:
val pairs = .... .cache() // (word, wordCount)
val wordsCount = pairs.aggregate(0)((sum, pair) => sum + pair._2, _ + _)
Первый аргумент - это начальное общее количество, которое должно быть равно нулю. Следующие два аргумента находятся в отдельном списке аргументов и выполняются для каждого члена вашей пары RDD . Первый из этого второго набора аргументов ((sum, pair) => sum + pair._2
, называемый оператор последовательности ) добавляет каждое значение в разделе к текущему значению суммы для этого раздела; в то время как второй (_ + _
, называемый оператором ) объединяет счетчики из разных разделов. Основное преимущество заключается в том, что эта операция выполняется параллельно для каждого раздела между данными и гарантирует не дорогостоящее перераспределение (a.k.a. перетасовка ) этих данных. (Перераспределение происходит, когда данные должны передаваться между узлами в кластере, и очень медленно из-за сетевого взаимодействия.)
Хотя aggregate
не влияет на разбиение данных, следует помнить, что, если ваши данные разбиты на разделы, операция map
удалит схему разбиения - из-за вероятности того, что клавиши в паре преобразование может изменить СДР . Это может привести к последующему перераспределению, если вы выполните какие-либо дальнейшие преобразования в результате преобразования map
. Тем не менее, операция map
, за которой следует reduce
, не должна приводить к перераспределению, но может быть несколько менее эффективной из-за дополнительного шага (но не слишком).
Если вы обеспокоены тем, что общее количество слов может превысить тип Int
, вы можете использовать BigInt
вместо:
val wordsCount = pairs.aggregate(BigInt(0))((sum, pair) => sum + pair._2, _ + _)
Что касается создания пары RDD с количеством слов и процентами в качестве значения, вы должны использовать mapValues
вместо map
. И снова mapValues
сохранит любую существующую схему разбиения (поскольку она гарантирует, что ключи не будут изменены при преобразовании), а map
удалит ее. Кроме того, mapValues
проще, так как вам не нужно обрабатывать key значения:
pairs.mapValues(c => (c, c * 100.0 / wordsCount))
Это должно обеспечить достаточную точность для ваших целей. Я бы беспокоился только о округлении и использовании BigDecimal
при получении и выводе значений. Однако, если вам это действительно нужно, ваш код будет выглядеть так:
pairs.mapValues{c =>
(c, BigDecimal(c * 100.0 / wordsCount).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble))
}