Уменьшить и суммировать кортежи по ключу - PullRequest
0 голосов
/ 19 октября 2018

В моем приложении Spark Scala у меня есть СДР в следующем формате:

(05/05/2020, (name, 1))
(05/05/2020, (name, 1))
(05/05/2020, (name2, 1))
...
(06/05/2020, (name, 1))

Я хочу сгруппировать эти элементы по дате и суммировать кортежи с тем же «именем», что и ключ..

Ожидаемый результат:

(05/05/2020, List[(name, 2), (name2, 1)]),
(06/05/2020, List[(name, 1)])
...

Для этого в настоящее время я использую операцию groupByKey и некоторые дополнительные преобразования для группировки кортежейпо ключу и вычислите сумму для тех, у кого один и тот же.

По соображениям производительности я хотел бы заменить эту операцию groupByKey на reduceByKey или aggregateByKey, чтобы уменьшить суммуданных, передаваемых по сети.

Однако я не могу понять, как это сделать.Оба эти преобразования принимают в качестве параметра функцию между значениями (кортежи в моем случае), поэтому я не вижу, как я могу сгруппировать кортежи по ключу, чтобы вычислить их сумму.

Это выполнимо?

Ответы [ 3 ]

0 голосов
/ 20 октября 2018

Вот как вы можете объединить свои кортежи, используя reduceByKey:

/**
File /path/to/file1:
15/04/2010  name
15/04/2010  name
15/04/2010  name2
15/04/2010  name2
15/04/2010  name3
16/04/2010  name
16/04/2010  name

File /path/to/file2:
15/04/2010  name
15/04/2010  name3
**/

import org.apache.spark.rdd.RDD

val filePaths = Array("/path/to/file1", "/path/to/file2").mkString(",")

val rdd: RDD[(String, (String, Int))] = sc.textFile(filePaths).
  map{ line =>
    val pair = line.split("\\t", -1)
    (pair(0), (pair(1), 1))
  }

rdd.
  map{ case (k, (n, v)) => (k, Map(n -> v)) }.
  reduceByKey{ (acc, m) =>
    acc ++ m.map{ case (n, v) => (n -> (acc.getOrElse(n, 0) + v)) }
  }.
  map(x => (x._1, x._2.toList)).
  collect
// res1: Array[(String, List[(String, Int)])] = Array(
//   (15/04/2010, List((name,3), (name2,2), (name3,2))), (16/04/2010, List((name,2)))
// )

Обратите внимание, что первоначальное отображение необходимо, потому что мы хотим объединить кортежи как элементы в Map и reduByKey для СДР [K, V] требует один и тот же тип данных V до и после преобразования:

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
0 голосов
/ 20 октября 2018

Вы можете преобразовать RDD в DataFrame и просто использовать groupBy с суммой, вот один из способов сделать это

import org.apache.spark.sql.types._
val schema = StructType(StructField("date", StringType, false) :: StructField("name", StringType, false) ::  StructField("value", IntegerType, false) :: Nil)

val rd = sc.parallelize(Seq(("05/05/2020", ("name", 1)),
("05/05/2020", ("name", 1)),
("05/05/2020", ("name2", 1)),
("06/05/2020", ("name", 1))))

val df = spark.createDataFrame(rd.map{ case (a, (b,c)) => Row(a,b,c)},schema)
df.show

+----------+-----+-----+
|      date| name|value|
+----------+-----+-----+
|05/05/2020| name|    1|
|05/05/2020| name|    1|
|05/05/2020|name2|    1|
|06/05/2020| name|    1|
+----------+-----+-----+

val sumdf = df.groupBy("date","name").sum("value")
sumdf.show

+----------+-----+----------+
|      date| name|sum(value)|
+----------+-----+----------+
|06/05/2020| name|         1|
|05/05/2020| name|         2|
|05/05/2020|name2|         1|
+----------+-----+----------+
0 голосов
/ 19 октября 2018

Да .aggeregateBykey() можно использовать следующим образом:

import scala.collection.mutable.HashMap

def merge(map: HashMap[String, Int], element: (String, Int)) = {
 if(map.contains(element._1)) map(element._1) += element._2 else map(element._1) = element._2
 map
}

val input = sc.parallelize(List(("05/05/2020",("name",1)),("05/05/2020", ("name", 1)),("05/05/2020", ("name2", 1)),("06/05/2020", ("name", 1))))

val output = input.aggregateByKey(HashMap[String, Int]())({
  //combining map & tuple   
  case (map, element) => merge(map, element) 
}, {
  // combining two maps 
  case (map1, map2) => {
   val combined = (map1.keySet ++ map2.keySet).map { i=> (i,map1.getOrElse(i,0) + map2.getOrElse(i,0)) }.toMap
   collection.mutable.HashMap(combined.toSeq: _*)
  } 
}).mapValues(_.toList)

кредиты: Лучший способ объединить две карты и суммировать значения одного и того же ключа?

...