Я знакомлюсь со Spark и Scala, и моя текущая задача - «сложить» эти два кадра данных:
+---+--------+-------------------+
|cyl|avg(mpg)| var_samp(mpg)|
+---+--------+-------------------+
| 8| 15.8| 1.0200000000000014|
| 6| 20.9|0.48999999999999966|
| 4| 33.9| 0.0|
+---+--------+-------------------+
+---+------------------+------------------+
|cyl| avg(mpg)| var_samp(mpg)|
+---+------------------+------------------+
| 8| 13.75| 6.746999999999998|
| 6| 21.4| NaN|
+---+------------------+------------------+
В этом случае «ключ» равен cyl
, а «значения» avg(mpg)
и var_samp(mpg)
.
(Примерный) результат для этих двух будет:
+---+--------+-------------------+
|cyl|avg(mpg)| var_samp(mpg)|
+---+--------+-------------------+
| 8| 29.55| 7.76712|
| 6| 42.3|0.48999999999999966|
| 4| 33.9| 0.0|
+---+--------+-------------------+
Обратите внимание, что NaN
считается равным нулю, а также как могут отсутствовать «ключи» в некоторых кадрах данных (во втором отсутствует 4 ключа).
Я подозреваю, что reduceByKey
будет подходить сюда, но не может заставить его работать.
Вот мой код:
case class Cars(car: String, mpg: String, cyl: String, disp: String, hp: String,
drat: String, wt: String, qsec: String, vs: String, am: String, gear: String, carb: String)
object Bootstrapping extends App {
override def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark and SparkSql").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// Exploring SparkSQL
// Initialize an SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
// Load a cvs file
val csv = sc.textFile("mtcars.csv")
// Create a Spark DataFrame
val headerAndRows = csv.map(line => line.split(",").map(_.trim))
val header = headerAndRows.first
val mtcdata = headerAndRows.filter(_(0) != header(0))
val mtcars = mtcdata
.map(p => Cars(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11)))
.toDF
// Aggregate data after grouping by columns
import org.apache.spark.sql.functions._
mtcars.sort($"cyl").show()
mtcars.groupBy("cyl").agg(avg("mpg"), var_samp("mpg")).sort($"cyl").show()
//sample 25% of the population without replacement
val sampledData = mtcars.sample(false, 0.25)
//bootstrapping loop
for (a <- 1 to 5) {
//get bootstrap sample
val bootstrapSample = sampledData.sample(true, 1)
//HERE!!! I WANT TO SAVE THE AGGREGATED SUM OF THE FOLLOWING:
bootstrapSample.groupBy("cyl").agg(avg("mpg"), var_samp("mpg"))
}
}
}
Вот данные, которые я использую: Автомобильные дорожные тесты Motor Trend