Я не уверен, что именно вы имеете в виду под "Оптимизированным", но более "scala-y" и "spark-y" способ сделать это может быть следующим:
import org.apache.spark.sql.expressions.Window
// Read your data file as a CSV file with row headers.
val marksDF = spark.read.option("header","true").csv("marks.csv")
// Calculate the total marks for each student in each year. The new total mark column will be called "totMark"
val marksByStudentYear = marksDF.groupBy(col("year"), col("student")).agg(sum(col("marks")).as("totMark"))
// Rank the marks within each year. Highest Mark will get rank 1, second highest rank 2 and so on.
// A benefit of rank is that if two scores have the same mark, they will both get the
// same rank.
val marksRankedByYear = marksByStudentYear.withColumn("rank", dense_rank().over(Window.partitionBy("year").orderBy($"totMark".desc)))
// Finally filter so that we only have the "top scores" (rank = 1) for each year,
// order by year and student name and display the result.
val topStudents = marksRankedByYear.filter($"rank" === 1).orderBy($"year", $"student").show
topStudents.show
Это даст следующий результат в Spark-shell:
+----+-------+-------+----+
|year|student|totMark|rank|
+----+-------+-------+----+
|2016| raj| 278.0| 1|
|2017| raghu| 288.0| 1|
|2017| rajesh| 288.0| 1|
+----+-------+-------+----+
Если вам нужен CSV, отображаемый в соответствии с вашим вопросом, вы можете использовать:
topStudents.collect.map(_.mkString(",")).foreach(println)
, который производит:
2016,raj,278.0,1
2017,raghu,288.0,1
2017,rajesh,288.0,1
Я разбил процесс на отдельные этапы. Это позволит вам увидеть, что происходит на каждом этапе, просто запустив show для промежуточного результата. Например, чтобы увидеть, что делает spark.read.option ..., просто введите marksDF.show в spark-shell
Так как OP хотел версию RDD, вот один пример. Возможно, это не оптимально, но дает правильный результат:
import org.apache.spark.rdd.RDD
// A Helper function which makes it slightly easier to view RDD content.
def dump[R] (rdd : RDD[R]) = rdd.collect.foreach(println)
val marksRdd = sc.textFile("marks.csv")
// A case class to annotate the content in the RDD
case class Report(year:Int, student:String, sub:String, mark:Int)
// Create the RDD as a series of Report objects - ignore the header.
val marksReportRdd = marksRdd.map(_.split(",")).mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}.map(r => Report(r(0).toInt,r(1),r(2),r(3).toInt))
// Group the data by year and student.
val marksGrouped = marksReportRdd.groupBy(report => (report.year, report.student))
// Calculate the total score for each student for each year by adding up the scores
// of each subject the student has taken in that year.
val totalMarkStudentYear = marksGrouped.map{ case (key, marks:Iterable[Report]) => (key, marks.foldLeft(0)((acc, rep) => acc + rep.mark))}
// Determine the highest score for each year.
val yearScoreHighest = totalMarkStudentYear.map{ case (key, score:Int) => (key._1, score) }.reduceByKey(math.max(_, _))
// Determine the list of students who have received the highest score in each year.
// This is achieved by joining the total marks each student received in each year
// to the highest score in each year.
// The join is performed on the key which must is a Tuple2(year, score).
// To achieve this, both RDD's must be mapped to produce this key with a data attribute.
// The data attribute for the highest scores is a dummy value "x".
// The data attribute for the student scores is the student's name.
val highestRankStudentByYear = totalMarkStudentYear.map{ case (key, score) => ((key._1, score), key._2)}.join (yearScoreHighest.map (k => (k, "x")))
// Finally extract the year, student name and score from the joined RDD
// Sort by year and name.
val result = highestRankStudentByYear.map{ case (key, score) => (key._1, score._1, key._2)}.sortBy( r => (r._1, r._2))
// Show the final result.
dump(result)
val result = highestRankStudentByYear.map{ case (key, score) => (key._1, score._1, key._2)}.sortBy( r => (r._1, r._2))
dump(result)
Результат вышеописанного:
(2016,raj,278)
(2017,raghu,288)
(2017,rajesh,288)
Как и прежде, вы можете просматривать промежуточные RDD, просто выгрузив их с помощью функции dump. Примечание: функция дампа принимает RDD. Если вы хотите показать содержимое DataFrame или набора данных, используйте его метод show.
Возможно, существует более оптимальное решение, чем приведенное выше, но оно выполняет свою работу.
Надеемся, что версия RDD побудит вас использовать DataFrames и / или DataSets, если вы можете. Не только код проще, но:
- Spark оценит DataFrames и DataSets и сможет оптимизировать весь процесс преобразования. СДР не являются (то есть они выполняются один за другим без оптимизации). Переводы процессов на основе DataFrame и DataSet, скорее всего, будут работать быстрее (при условии, что вы не оптимизируете эквивалент RDD вручную)
- DataSets и DataFrames допускают схемы в различной степени (например, именованные столбцы и типизация данных).
- DataFrames и DataSets можно запрашивать с помощью SQL.
- Операции / методы DataFrame и DataSet более согласованы с конструкциями SQL
- DataFrames и DataSet проще в использовании, чем RDD
- DataSets (и RDD) предлагают обнаружение ошибок времени компиляции.
- DataSets - будущее направление.
Проверьте эти несколько ссылок для получения дополнительной информации:
https://data -flair.training / блоги / апач-искровым РДД-против-dataframe-против-набора данных /
https://www.linkedin.com/pulse/apache-spark-rdd-vs-dataframe-dataset-chandan-prakash/
https://medium.com/@sachee/apache-spark-dataframe-vs-rdd-24a04d2eb1b9
или просто гугл "спарк, если я использую rdd или dataframe"
Всего наилучшего в вашем проекте.