Как уменьшить Список [Ключ, Список [Имя, Значение]] в Spark? - PullRequest
1 голос
/ 25 февраля 2020

Это структура моих моделей

package object summary {
  case class NameValuePair(name: String, value: Long)

  case class Result(key: String, pairs: List[NameValuePair])

  case class Data(data: List[Result])
}

Данные будут выглядеть как

[
Result("Paris", List[NameValuePair("apples",10),NameValuePair("oranges",20),NameValuePair("peaches",30)]),
Result("Paris", List[NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40)]),
Result("NY", List[NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40)]),
Result("NY", List[NameValuePair("apples",40),NameValuePair("oranges",30),NameValuePair("peaches",10)]),
Result("London", List[NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40)])
]

Я хочу вывод, как показано ниже

[
("Paris", [("apples", 30),("oranges", 50),("peaches",70)]),
("NY", [("apples", 60),("oranges", 60),("peaches",50)]),
("London", [("apples", 20),("oranges", 30),("peaches",40)])
]

Я хочу чтобы найти сумму подсчета фруктов по городам. Как это сделать с помощью spark?

Ответы [ 3 ]

2 голосов
/ 25 февраля 2020

Этого можно достичь, используя искровые RDD, например:

Я заново создал ваши данные для создания RDD:

val data_test =
List(Result("Paris", List( new NameValuePair("apples",10),new NameValuePair("oranges",20), new NameValuePair("peaches",30))),
Result("Paris", List( new NameValuePair("apples",20), new NameValuePair("oranges",30),new NameValuePair("peaches",40))),
Result("NY", List(new NameValuePair("apples",20),new NameValuePair("oranges",30), new NameValuePair("peaches",40))),
Result("NY", List(new NameValuePair("apples",40), new NameValuePair("oranges",30), new NameValuePair("peaches",10))),
Result("London", List(new NameValuePair("apples",20),new NameValuePair("oranges",30),new NameValuePair("peaches",40))) )

Затем я создал RDD из data_test и применил к нему преобразования. , вот код:

val rdd_data = sc.parallelize(data_test)
val rdd_1 = rdd_data.map(x => ((x.key,x.pairs(0).name),x.pairs(0).value))
val rdd_2 = rdd_data.map(x => ((x.key,x.pairs(1).name),x.pairs(1).value))
val rdd_3 = rdd_data.map(x => ((x.key,x.pairs(2).name),x.pairs(2).value))
val rdd_final = rdd_1.union(rdd_2).union(rdd_3)
val rdd_reduce = rdd_final.reduceByKey((x,y) => x+y)
val rdd_transformed = rdd_reduce.map(x=>(x._1._1,(x._1._2,x._2))).groupByKey().map(x=>(x._1,x._2.toList))
rdd_transformed.foreach(println)

Полученный результат выглядит так:

(NY,List((peaches,50), (apples,60), (oranges,60)))
(London,List((apples,20), (peaches,40), (oranges,30)))
(Paris,List((oranges,50), (peaches,70), (apples,30)))

[РЕДАКТИРОВАТЬ ПОСЛЕ КОММЕНТАРИИ] Если количество пар меняется, вы можете определить функцию следующим образом:

def func(res : Result): List[((String,String),Long)] = {
    var r = List[((String,String),Long)]()
    var i = List[NameValuePair]()
    for(i <- res.pairs){
        val tt : ((String,String),Long)= ((res.key,i.name),i.value)
        r = tt :: r
    }
    return r
}

Затем вы можете сразу перейти к строке, где я генерирую rdd_final выше, примерно так:

val rdd_final = rdd_data.flatMap(x=>func(x))

Затем выполнить другие инструкции таким же образом.

0 голосов
/ 25 февраля 2020

Spark может преобразовать ваш список в Dataframe, просто импортировав spark.implicits._. Затем вы можете позвонить list.toDF, чтобы преобразовать ваш список в Spark Dataframe. Далее мы будем использовать explode с агрегацией sum через API данных:

import org.apache.spark.sql.functions.{explode, sum, collect_list, struct}
import spark.implicits._

val df = List(
  Result("Paris", List(NameValuePair("apples",10),NameValuePair("oranges",20),NameValuePair("peaches",30))),
  Result("Paris", List(NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40))),
  Result("NY", List(NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40))),
  Result("NY", List(NameValuePair("apples",40),NameValuePair("oranges",30),NameValuePair("peaches",10))),
  Result("London", List(NameValuePair("apples",20),NameValuePair("oranges",30),NameValuePair("peaches",40)))
).toDF

df.select($"key".as("city"), explode($"pairs").as("nameValuePair"))
  .groupBy($"city", $"nameValuePair.name".as("fruit"))
  .agg(
    sum("nameValuePair.value").as("sum")
  )
  .groupBy("city")
  .agg(
    collect_list(
      struct("fruit", "sum")
    ).as("fruit_stats")
  ).show(false)

// +------+--------------------------------------------+
// |city  |fruit_stats                                 |
// +------+--------------------------------------------+
// |London|[[peaches, 40], [apples, 20], [oranges, 30]]|
// |Paris |[[peaches, 70], [apples, 30], [oranges, 50]]|
// |NY    |[[apples, 60], [peaches, 50], [oranges, 60]]|
// +------+--------------------------------------------+

Анализ: Сначала мы взрываем массив пар, затем группируем по (город, фрукты) чтобы получить сумму фруктов для каждого города. Наконец мы группируем по городам, чтобы создать окончательный список struct("fruit", "sum").

0 голосов
/ 25 февраля 2020

Я бы использовал dataframe group by function для этого. Как это:

import spark.implicits._
Seq(
  Result("Paris", List( new NameValuePair("apples",10),new NameValuePair("oranges",20), new NameValuePair("peaches",30))),
  Result("Paris", List( new NameValuePair("apples",20), new NameValuePair("oranges",30),new NameValuePair("peaches",40))),
  Result("NY", List(new NameValuePair("apples",20),new NameValuePair("oranges",30), new NameValuePair("peaches",40))),
  Result("NY", List(new NameValuePair("apples",40), new NameValuePair("oranges",30), new NameValuePair("peaches",10))),
  Result("London", List(new NameValuePair("apples",20),new NameValuePair("oranges",30),new NameValuePair("peaches",40)))
).flatMap(row => {
  val city = row.key
  val fruits = row.pairs
  fruits.map(f => {
    val fruitName = f.name
    val v = f.value
    (city, fruitName, v)
  })
}).toDF("city", "fruit", "value")
  .groupBy("city").sum().show()
//The result would be:
+------+----------+
|  city|sum(value)|
+------+----------+
|London|        90|
| Paris|       150|
|    NY|       170|
+------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...