Как сгруппировать набор данных в Scala Apache Spark? - PullRequest
1 голос
/ 18 июня 2019

Я хочу сгруппировать свой набор данных по этой первой части моей строки.Таким образом, чтобы сгруппировать его по "SC Freiburg", "Arsenal" и так далее ... Кроме того, чтобы сгруппировать, мне нужно подсчитать количество сгруппированных.

scala> res61.foreach(println)
SC Freiburg,2014,Germany,7747
Arsenal,2014,Germany,7745
Arsenal,2014,Germany,7750
Arsenal,2014,Germany,7758
Bayern Munich,2014,Germany,7737
Bayern Munich,2014,Germany,7744
Bayern Munich,2014,Germany,7746
Bayern Munich,2014,Germany,7749
Bayern Munich,2014,Germany,7752
Bayern Munich,2014,Germany,7754
Bayern Munich,2014,Germany,7755
Borussia Dortmund,2014,Germany,7739
Borussia Dortmund,2014,Germany,7740
Borussia Dortmund,2014,Germany,7742
Borussia Dortmund,2014,Germany,7743
Borussia Dortmund,2014,Germany,7756
Borussia Mönchengladbach,2014,Germany,7757
Schalke 04,2014,Germany,7741
Schalke 04,2014,Germany,7753
Chelsea,2014,Germany,7751
Hannover 96,2014,Germany,7738
Real Madrid,2014,Germany,7748
Lazio,2014,Germany,7759

ПОДСКАЗКА: я должениспользуйте операции rdd, пожалуйста, не предлагайте использовать кадры данных, которые я видел в этом посте: группа наборов искровых данных и сумма Но я не знаю, чтобы воспроизвести его в моем примере.

Это вывод результатаиз моей базы данных postgresql:

result

Ответы [ 3 ]

3 голосов
/ 18 июня 2019

СДР для этого имеет методы groupBy () и groupByKey ().например, для подсчета группы вы можете сделать:

val str ="""SC Freiburg,2014,Germany,7747
   Arsenal,2014,Germany,7745
   ...
"""
val rdd = sc.parallelize(str.split("\n"))
rdd.map (_.split(",")).keyBy(_(0)).groupByKey().map {case (k, v) => (k, v.size)}.collect
2 голосов
/ 18 июня 2019

ваш mycsv - это ваш csv в виде файла.

groupByKey(_._1.toLowerCase)

- это то, что вам нужно.


Примечание: Подход RDD с использованием больших данных - это проблема производительности, поскольку в качестве формата внутренней памяти будет использоваться java-сериализация, если вы используете вольфрамовые наборы данных. Поэтому всегда предпочитайте DataSet и DataFrame .

package com.examples

import org.apache.log4j.Level
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}


object DataSetGroupTest {
org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)

def main(args: Array[String]) {

 val spark = SparkSession.builder.
   master("local")
   .appName("DataSetGroupTest")
   .getOrCreate()

 import spark.implicits._
 // if you have a file
 val csvData: Dataset[String] = spark.read.text("mycsv.csv").as[String]

 csvData.show(false)
 //csvData.foreach(println(_))
 val words: Dataset[Array[String]] = csvData.map(value => value.split(","))
 println("convert to array")
 val finalwords: Dataset[(String, String, String, String)] = words.map { case Array(f1, f2, f3, f4) => (f1, f2, f3, f4) }
 finalwords.foreach(println(_))
 val groupedWords: KeyValueGroupedDataset[String, (String, String, String, String)] = finalwords.groupByKey(_._1.toLowerCase)
 val counts: Dataset[(String, Long)] = groupedWords.count().sort($"count(1)".desc)
 counts.show(false)
}
}

Результат:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------------------------------------------+
|value                                     |
+------------------------------------------+
|Freiburg,2014,Germany,7747                |
|Arsenal,2014,Germany,7745                 |
|Arsenal,2014,Germany,7750                 |
|Arsenal,2014,Germany,7758                 |
|Bayern Munich,2014,Germany,7737           |
|Bayern Munich,2014,Germany,7744           |
|Bayern Munich,2014,Germany,7746           |
|Bayern Munich,2014,Germany,7749           |
|Bayern Munich,2014,Germany,7752           |
|Bayern Munich,2014,Germany,7754           |
|Bayern Munich,2014,Germany,7755           |
|Borussia Dortmund,2014,Germany,7739       |
|Borussia Dortmund,2014,Germany,7740       |
|Borussia Dortmund,2014,Germany,7742       |
|Borussia Dortmund,2014,Germany,7743       |
|Borussia Dortmund,2014,Germany,7756       |
|Borussia Mönchengladbach,2014,Germany,7757|
|Schalke 04,2014,Germany,7741              |
|Schalke 04,2014,Germany,7753              |
|Chelsea,2014,Germany,7751                 |
+------------------------------------------+
only showing top 20 rows

convert to array
(Freiburg,2014,Germany,7747)
(Arsenal,2014,Germany,7745)
(Arsenal,2014,Germany,7750)
(Arsenal,2014,Germany,7758)
(Bayern Munich,2014,Germany,7737)
(Bayern Munich,2014,Germany,7744)
(Bayern Munich,2014,Germany,7746)
(Bayern Munich,2014,Germany,7749)
(Bayern Munich,2014,Germany,7752)
(Bayern Munich,2014,Germany,7754)
(Bayern Munich,2014,Germany,7755)
(Borussia Dortmund,2014,Germany,7739)
(Borussia Dortmund,2014,Germany,7740)
(Borussia Dortmund,2014,Germany,7742)
(Borussia Dortmund,2014,Germany,7743)
(Borussia Dortmund,2014,Germany,7756)
(Borussia Mönchengladbach,2014,Germany,7757)
(Schalke 04,2014,Germany,7741)
(Schalke 04,2014,Germany,7753)
(Chelsea,2014,Germany,7751)
(Hannover 96,2014,Germany,7738)
(Real Madrid,2014,Germany,7748)
(Lazio,2014,Germany,7759)
                                                                               +------------------------+--------+
|value                   |count(1)|
+------------------------+--------+
|bayern munich           |7       |
|borussia dortmund       |5       |
|arsenal                 |3       |
|schalke 04              |2       |
|lazio                   |1       |
|hannover 96             |1       |
|chelsea                 |1       |
|real madrid             |1       |
|freiburg                |1       |
|borussia mönchengladbach|1       |
+------------------------+--------+

1 голос
/ 18 июня 2019

Предполагая, что "yourrdd" представляет данные, которые вы показали ранее, вы можете использовать что-то вроде ниже, чтобы получить результат.

yourrdd.groupBy(_(0)).map(x => (x._1,x._2.size)).sortBy((x => x._2),false).collect.foreach(println)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...