Моя цель - создать куб из 4-х измерений и 1-го измерения.
Это означает, что у меня есть всего 16 GroupBy для вычисления.
В моем коде вы можете видеть 4 измерения (Gender, Age, TotalChildren, ProductCategoryName) и Measure TotalCost.
У меня есть фильтр всех моих столбцов, чтобы удалить любую строку, которая является нулевой.
После этого я вычисляю каждый GroupBy один за другим, а затем использую coalesce (), чтобы связать CSV-файлы в один файл.
Весь процесс занимает около 10 минут, что я считаю слишком много.
Есть ли способ улучшить процесс?Может быть, вычисляя одни групповые из других?
Кроме того, мои данные составляют около 5 ГБ, так что если я прочитал их 16 раз, как число групповых, это означает в общей сложности 80 ГБ.
Вот мой код
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
object ComputeCube {
def main(args:Array[String]):Unit= {
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("SparkProject2018")
.getOrCreate()
import spark.implicits._
val filePath="src/main/resources/dataspark.txt"
val df = spark.read.options(Map("inferSchema"->"true","delimiter"->"|","header"->"true"))
.csv(filePath).select("Gender", "BirthDate", "TotalCost", "TotalChildren", "ProductCategoryName")
val df2 = df
.filter("Gender is not null")
.filter("BirthDate is not null")
.filter("TotalChildren is not null")
.filter("ProductCategoryName is not null")
val currentDate = udf{ (dob: java.sql.Date) =>
import java.time.{LocalDate, Period}
Period.between(dob.toLocalDate, LocalDate.now).getYears
}
val df3 = df2.withColumn("Age", currentDate($"BirthDate"))
val groupByAll = df3.groupBy("Gender","Age", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")
val groupByGenderAndAgeAndTotalChildren = df3.groupBy("Gender","Age", "TotalChildren").avg("TotalCost")
val groupByGenderAndAgeAndProductCategoryName = df3.groupBy("Gender","Age", "ProductCategoryName" ).avg("TotalCost")
val groupByGenderAndTotalChildrenAndProductCategoryName = df3.groupBy("Gender", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")
val groupByAgeAndTotalChildrenAndProductCategoryName = df3.groupBy("Age", "TotalChildren", "ProductCategoryName" ).avg("TotalCost")
val groupByGenderAndAge = df3.groupBy("Gender","Age").avg("TotalCost")
val groupByGenderAndTotalChildren = df3.groupBy("Gender","TotalChildren").avg("TotalCost")
val groupByGenderAndProductCategoryName = df3.groupBy("Gender","ProductCategoryName" ).avg("TotalCost")
val groupByAgeAndTotalChildren = df3.groupBy("Age","TotalChildren").avg("TotalCost")
val groupByAgeAndProductCategoryName = df3.groupBy("Age","ProductCategoryName" ).avg("TotalCost")
val groupByTotalChildrenAndProductCategoryName = df3.groupBy("TotalChildren","ProductCategoryName" ).avg("TotalCost")
val groupByGender = df3.groupBy("Gender").avg("TotalCost")
val groupByAge = df3.groupBy("Age").avg("TotalCost")
val groupByTotalChildren = df3.groupBy("TotalChildren" ).avg("TotalCost")
val groupByProductCategoryName = df3.groupBy("ProductCategoryName" ).avg("TotalCost")
val groupByNone = df3.groupBy().avg("TotalCost")
groupByAll.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/All.csv")
groupByGenderAndAgeAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_Age_TotalChildren.csv")
groupByGenderAndAgeAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_Age_ProductCategoryName.csv")
groupByGenderAndTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_TotalChildren_ProductCategoryName.csv")
groupByAgeAndTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Age_TotalChildren_ProductCategoryName.csv")
groupByGenderAndAge.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_Age.csv")
groupByGenderAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_TotalChildren.csv")
groupByGenderAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender_ProductCategoryName.csv")
groupByAgeAndTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Age_TotalChildren.csv")
groupByAgeAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Age_ProductCategoryName.csv")
groupByTotalChildrenAndProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/TotalChildren_ProductCategoryName.csv")
groupByGender.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Gender.csv")
groupByAge.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/Age.csv")
groupByTotalChildren.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/TotalChildren.csv")
groupByProductCategoryName.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/ProductCategoryName.csv")
groupByNone.coalesce(1).write.format("csv").option("delimiter", "|").option("header", "true")
.mode("overwrite").save("src/main/resources/None.csv")
}
}