Ссылка на (data.csv) и (output.csv)
import org.apache.spark.sql._
object Test {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.appName("Test")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
val tempDF=spark.read.csv("data.csv")
tempDF.coalesce(1).write.parquet("Parquet")
val rdd = sc.textFile("Parquet")
Я преобразовываю data.csv в оптимизированный файл паркета, а затем загружаю его, и теперь яЯ хочу выполнить все преобразования в файле паркета, как я это сделал в CSV-файле, указанном ниже, а затем сохранить его как файл паркета. Ссылка (data.csv) и (output.csv)
val header = rdd.first
val rdd1 = rdd.filter(_ != header)
val resultRDD = rdd1.map { r =>
val Array(country, values) = r.split(",")
country -> values
}.reduceByKey((a, b) => a.split(";").zip(b.split(";")).map { case (i1, i2) => i1.toInt + i2.toInt }.mkString(";"))
import spark.sqlContext.implicits._
val dataSet = resultRDD.map { case (country: String, values: String) => CountryAgg(country, values) }.toDS()
dataSet.coalesce(1).write.option("header","true").csv("output")
}
case class CountryAgg(country: String, values: String)
}