Как обработать массив из столбца json в спарке sql dataframe - PullRequest
0 голосов
/ 13 апреля 2020

Ввод Json

{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"}, {"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"English","score":70,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Chemistry","score":72,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|class|grade|mailId       |newSub     |score|scoreBoard                                                                                      |studentName|
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|7    |A    |abc@gmail.com|Environment|95   |[[A,90,Math], [A,82,Science], [A,80,History], [B,75,Hindi], [A,80,English], [A,80,Geography]]   |abc        |
|8    |A    |xyz@gmail.com|Environment|95   |[[A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [B,70,English], [A,87,Biology]]   |xyz        |
|9    |A    |efg@gmail.com|Environment|95   |[[A,91,Math], [B,77,Physics], [B,72,Chemistry], [A,95,Computer], [A,82,English], [B,76,Biology]]|efg        |
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+

Обработка Я хочу -

  1. добавить newSub's json - список ScoreBoard (чтение данных из строки пользователя - newSub, оценка , оценка)

  2. отсортировать их по количеству баллов и удалить json из списка ScoreBoard с меньшим количеством баллов

Ожидаемый результат -

{"studentName": "abc","mailId": "abc@gmail.com","class" : 7,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "xyz@gmail.com","class" : 8,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "efg@gmail.com","class" : 9,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|class|mailId       |scoreBoard                                                                                         |studentName|
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|7    |abc@gmail.com|[[A,95,Environment], [A,90,Math], [A,82,Science], [A,80,History], [A,80,English], [A,80,Geography]]|abc        |
|8    |xyz@gmail.com|[[A,95,Environment], [A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [A,87,Biology]]  |xyz        |
|9    |efg@gmail.com|[[A,95,Environment], [A,91,Math], [B,77,Physics], [A,95,Computer], [A,82,English], [B,76,Biology]] |efg        |
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+

Я пытался

1-й способ - обработка UDF, но Сортировка и удаление json из столбца ScoreBoard в UDF является сложной задачей

2-й способ - взорвать столбец ScoreBoard, получил 6 строк для один студент, каждый для каждого предмета. Задача, с которой я сталкиваюсь, заключается в том, как обрабатывать группы данных, например, как добавить новую строку для нового субъекта, отсортировать оценку каждого пользователя и удалить одну строку.

Нужна помощь в выборе способа решения этой проблемы, если кто-то знает, есть ли какой-нибудь новый / другой эффективный способ сделать ту же обработку. Спасибо !!

Ответы [ 2 ]

1 голос
/ 13 апреля 2020
 import ss.implicits._

  val schema = new ArrayType(new StructType(Array(
    StructField("grade",DataTypes.StringType,true),
    StructField("score",DataTypes.LongType,true),
    StructField("subject",DataTypes.StringType,true))),true)

  def addValue = udf((array: Seq[Row], newval:Row)=> array ++ Array(newval),schema)

  def sortAndRemove = udf((array: Seq[Row])=> array.sortBy(x=>x.getAs[Long]("score"))(Ordering[Long].reverse).slice(0,array.length-1),schema)

val df2 =  df.withColumn("map_col",struct(col("grade"),col("score"),col("newSub").as("subject")))
    .withColumn("scoreBoard",sortAndRemove(addValue(col("scoreBoard"),col("map_col"))))
  df2.select("scoreBoard").show(false)

Подход UDF, где ss - SparkSession. addvalue можно заменить на array_union, если вы используете версию 2.4 и выше.

Код выше будет работать для версии 2.0 и выше

0 голосов
/ 13 апреля 2020

при таком подходе используются кадры данных / наборы данных Spark и Spark SQL.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}

object ProcessingList {
  val spark = SparkSession
    .builder()
    .appName("ProcessingList")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ProcessingList") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val sqlContext = spark.sqlContext

  val input = "/home/cloudera/files/tests/list_processing.json"

  case class Student(cl: Long, grade: String,mail : String,ns: String,score: Long,sbGrade: String, sbScore: Long,sbSubject: String, name: String)

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {
      import spark.implicits._

      val studentTest = sqlContext
        .read
        .json(input)
        .flatMap(r => r.getSeq(5).map( (sq: Row)  => Student(r.getLong(0), r.getString(1), r.getString(2), r.getString(3), r.getLong(4),sq.getString(0),sq.getLong(1), sq.getString(2), r.getString(6)))).as[Student]
        .cache()

      studentTest.show(truncate = false)

      studentTest.createOrReplaceTempView("student_test")

      sqlContext
          .sql(
            """
              |SELECT cl, grade, mail,ns, score, 
              |RANK() OVER(PARTITION BY cl ORDER BY sbScore DESC) AS ranking, 
              |sbGrade,sbScore, sbSubject, name
              |FROM student_test
              |ORDER BY cl
              |""".stripMargin)
          .show(truncate = false)


      // To have the opportunity to view the web console of Spark: http://localhost:4041/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

и ожидаемые результаты

+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
|cl |grade|mail         |ns         |score|points|sbGrade|sbScore|sbSubject|name|
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
|7  |A    |abc@gmail.com|Environment|95   |1     |A      |90     |Math     |abc |
|7  |A    |abc@gmail.com|Environment|95   |2     |A      |82     |Science  |abc |
|7  |A    |abc@gmail.com|Environment|95   |3     |A      |80     |History  |abc |
|7  |A    |abc@gmail.com|Environment|95   |3     |A      |80     |English  |abc |
|7  |A    |abc@gmail.com|Environment|95   |3     |A      |80     |Geography|abc |
|7  |A    |abc@gmail.com|Environment|95   |6     |B      |75     |Hindi    |abc |
|8  |A    |xyz@gmail.com|Environment|95   |1     |A      |90     |Math     |xyz |
|8  |A    |xyz@gmail.com|Environment|95   |2     |A      |87     |Biology  |xyz |
|8  |A    |xyz@gmail.com|Environment|95   |3     |A      |85     |Physics  |xyz |
|8  |A    |xyz@gmail.com|Environment|95   |4     |A      |80     |Chemistry|xyz |
|8  |A    |xyz@gmail.com|Environment|95   |5     |B      |75     |Hindi    |xyz |
|8  |A    |xyz@gmail.com|Environment|95   |6     |B      |70     |English  |xyz |
|9  |A    |efg@gmail.com|Environment|95   |1     |A      |95     |Computer |efg |
|9  |A    |efg@gmail.com|Environment|95   |2     |A      |91     |Math     |efg |
|9  |A    |efg@gmail.com|Environment|95   |3     |A      |82     |English  |efg |
|9  |A    |efg@gmail.com|Environment|95   |4     |B      |77     |Physics  |efg |
|9  |A    |efg@gmail.com|Environment|95   |5     |B      |76     |Biology  |efg |
|9  |A    |efg@gmail.com|Environment|95   |6     |B      |72     |Chemistry|efg |
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+

Надеюсь, это может быть полезно.

С уважением.

...