У меня есть фрейм данных со многими столбцами, и я хочу получить максимальное, минимальное, стандартное, среднее, среднее и нулевое число.Я получаю результат dataframe с циклом.Но это так медленно.Мой код выглядит следующим образом:
def std(sparkSession: SparkSession, feature:String, df:DataFrame): DataFrame = {
val df1 = df.select(feature).filter(x=>x(0)!=null && x(0)!="null") // need filter null and "null"
val max = df1.rdd.map(x=>x(0).toString.toDouble).max()
val min = df1.rdd.map(x=>x(0).toString.toDouble).min()
val sum = df1.rdd.map(x=>x(0).toString.toDouble).sum()
val count = df1.count().toDouble
val mean = sum / count
val median = df1.stat.approxQuantile(feature, Array(0.5), 0.001)(0)
val variance = df1.withColumn("power", col(feature)*col(feature)).select("power").rdd.map(_(0).toString.toDouble).sum() / count - mean * mean
val std = breeze.numerics.sqrt(variance)
import sparkSession.implicits._
Seq(
(feature, "count", count.toString)
,(feature, "median", median.toString)
,(feature, "max", max.toString())
,(feature, "min", min.toString())
,(feature, "mean", mean.toString)
,(feature, "std", std.toString)
).toDF("feature", "type", "value")
}
for(j <- 0 until stdArr.length) {
var stdDF = std(sparkSession, stdArr(j), userFeatureDF)
stdDF += stdDF // pseudo code, get the union of all dataframe
}
И stdArr
- это массив имен столбцов с фреймом данных userFeatureDF
.
Как получить окончательный stdDF
в параллельном режиме?