ваш mycsv - это ваш csv в виде файла.
groupByKey(_._1.toLowerCase)
- это то, что вам нужно.
Примечание: Подход RDD с использованием больших данных - это проблема производительности, поскольку в качестве формата внутренней памяти будет использоваться java-сериализация, если вы используете вольфрамовые наборы данных. Поэтому всегда предпочитайте DataSet и DataFrame .
package com.examples
import org.apache.log4j.Level
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}
object DataSetGroupTest {
org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("DataSetGroupTest")
.getOrCreate()
import spark.implicits._
// if you have a file
val csvData: Dataset[String] = spark.read.text("mycsv.csv").as[String]
csvData.show(false)
//csvData.foreach(println(_))
val words: Dataset[Array[String]] = csvData.map(value => value.split(","))
println("convert to array")
val finalwords: Dataset[(String, String, String, String)] = words.map { case Array(f1, f2, f3, f4) => (f1, f2, f3, f4) }
finalwords.foreach(println(_))
val groupedWords: KeyValueGroupedDataset[String, (String, String, String, String)] = finalwords.groupByKey(_._1.toLowerCase)
val counts: Dataset[(String, Long)] = groupedWords.count().sort($"count(1)".desc)
counts.show(false)
}
}
Результат:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------------------------------------------+
|value |
+------------------------------------------+
|Freiburg,2014,Germany,7747 |
|Arsenal,2014,Germany,7745 |
|Arsenal,2014,Germany,7750 |
|Arsenal,2014,Germany,7758 |
|Bayern Munich,2014,Germany,7737 |
|Bayern Munich,2014,Germany,7744 |
|Bayern Munich,2014,Germany,7746 |
|Bayern Munich,2014,Germany,7749 |
|Bayern Munich,2014,Germany,7752 |
|Bayern Munich,2014,Germany,7754 |
|Bayern Munich,2014,Germany,7755 |
|Borussia Dortmund,2014,Germany,7739 |
|Borussia Dortmund,2014,Germany,7740 |
|Borussia Dortmund,2014,Germany,7742 |
|Borussia Dortmund,2014,Germany,7743 |
|Borussia Dortmund,2014,Germany,7756 |
|Borussia Mönchengladbach,2014,Germany,7757|
|Schalke 04,2014,Germany,7741 |
|Schalke 04,2014,Germany,7753 |
|Chelsea,2014,Germany,7751 |
+------------------------------------------+
only showing top 20 rows
convert to array
(Freiburg,2014,Germany,7747)
(Arsenal,2014,Germany,7745)
(Arsenal,2014,Germany,7750)
(Arsenal,2014,Germany,7758)
(Bayern Munich,2014,Germany,7737)
(Bayern Munich,2014,Germany,7744)
(Bayern Munich,2014,Germany,7746)
(Bayern Munich,2014,Germany,7749)
(Bayern Munich,2014,Germany,7752)
(Bayern Munich,2014,Germany,7754)
(Bayern Munich,2014,Germany,7755)
(Borussia Dortmund,2014,Germany,7739)
(Borussia Dortmund,2014,Germany,7740)
(Borussia Dortmund,2014,Germany,7742)
(Borussia Dortmund,2014,Germany,7743)
(Borussia Dortmund,2014,Germany,7756)
(Borussia Mönchengladbach,2014,Germany,7757)
(Schalke 04,2014,Germany,7741)
(Schalke 04,2014,Germany,7753)
(Chelsea,2014,Germany,7751)
(Hannover 96,2014,Germany,7738)
(Real Madrid,2014,Germany,7748)
(Lazio,2014,Germany,7759)
+------------------------+--------+
|value |count(1)|
+------------------------+--------+
|bayern munich |7 |
|borussia dortmund |5 |
|arsenal |3 |
|schalke 04 |2 |
|lazio |1 |
|hannover 96 |1 |
|chelsea |1 |
|real madrid |1 |
|freiburg |1 |
|borussia mönchengladbach|1 |
+------------------------+--------+