ReduceByKey на Итерируемое значение кортежей - PullRequest
0 голосов
/ 15 сентября 2018

Я пытаюсь подсчитать появления определенных предметов на определенную дату.

Структура моего ввода Date\tItem1:AppearencesOfItem1,...,ItemN:AppearencesOfItemN

Пример

20/10/2000\tItem1:1,Item2:5
20/10/2000\tItem1:2
21/10/2000\tItem1:5

Для этого я создаю следующую структуру PairRdd:

[(20/10/2000, (Item1, 1))
(20/10/2000, (Item2, 5))
(20/10/2000, (Item1, 5))
(21/10/2000, (Item1, 5))]

, а затем groupByKey в дату, которая приводит к:

[(20/10/2000, Iterable[(Item1, 1), (Item2, 5), (Item1, 5))
 (21/10/2000, Iterable[(Item1, 5)]

Что я хочупосле этого шага уменьшите значение этих пар и суммируйте появления элементов, имеющих один и тот же ключ, чтобы результат получился таким:

[(20/10/2000, Iterable[(Item1, 6), (Item2, 5))
 (21/10/2000, Iterable[(Item1, 5)]

Однако я не нашел способа уменьшитьна значение этих pairRdds.Во-первых, мой подход неверен?

Ответы [ 2 ]

0 голосов
/ 17 сентября 2018

Может быть достигнуто за два шага:

  1. Суммирование по первым двум столбцам
  2. GroupBy (или уменьшение для производительности) по первому столбцу

    val data = List( 
      ("20/10/2000", "Item1", 1),
      ("20/10/2000", "Item2", 5),
      ("20/10/2000", "Item1", 5),
      ("21/10/2000", "Item1", 5)
    )
    val originalRDD = sparkContext.parallelize(data)
    
    val sumRDD = originalRDD.map(v => ((v._1, v._2), v._3)).reduceByKey(_ + _)
    sumRDD.map(v => ((v._1._1), (v._1._2, v._2))).groupByKey().foreach(println)
    

Выход:

(21/10/2000,CompactBuffer((Item1,5)))
(20/10/2000,CompactBuffer((Item1,6), (Item2,5)))
0 голосов
/ 15 сентября 2018

Надеюсь, это поможет, может быть, он не самый элегантный способ, но, кажется, соответствует вашим требованиям:

rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum))

Сначала сопоставьте ваши значения для группировки по itemId, затем по этому сгруппированному списку сопоставьте значения снова, чтобы сохранить только второй элемент (целое число), чтобы вы могли суммировать его прямо

Выход:

scala> rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum)).foreach(println)
(21/10/2000,Map(Item1 -> 5))
(20/10/2000,Map(Item2 -> 5, Item1 -> 6))

Редактировать Я создавал карту внутри вашего RDD, если вы хотите, чтобы она была в виде списка или что-то еще, просто выполните toList

rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum).toList)
...