Обработка списка вложенных столбцов в Spark огромный массив данных в scala - PullRequest
0 голосов
/ 09 апреля 2020

Я хочу сохранить список вложенных json в кадре данных spark, а также хотел обработать этот столбец. Существует также необходимость в таких операциях, как обновление какого-либо значения или удаление.

{
  "studentName": "abc",
  "mailId": "abc@gmail.com",
  "class" : 7,
  "scoreBoard" : [
    {"subject":"Math","score":90,"grade":"A"},
    {"subject":"Science","score":82,"grade":"A"},
    {"subject":"History","score":80,"grade":"A"},
    {"subject":"Hindi","score":75,"grade":"B"},
    {"subject":"English","score":80,"grade":"A"},
    {"subject":"Geography","score":80,"grade":"A"},
  ]
}

Попытка обработать поле ScoreBoard из приведенных выше данных, найти пять лучших предметов, удалить строку с наименьшим количеством баллов, а также изменить оценку некоторых subject.

case class Student(subject: String, score: Long, grade : String)

var studentTest = sc.read.json("**/testStudent.json")

val studentSchema = ArrayType(new StructType().add("subject", StringType).add("score", LongType).add("grade", StringType))

val parseStudentUDF = udf((scoreBoard : Seq[Row]) => {
 //do data processing  and return updated data
 ListBuffer(Subtable(subject,score,grade), , ,)
}, subtableSchema)


studentTest = studentTest.withColumn("scoreBoard",parseStudentUDF(col("scoreBoard")))

Я не уверен, как преобразовать seq [Row] в DataFrame в UDF или как обработать seq для сортировки dara и удаления любой строки. Есть какой-либо способ сделать это? Любой другой подход также приемлем.

Ответы [ 2 ]

1 голос
/ 09 апреля 2020

этот подход использует кадры данных Spark и Spark SQL. Я надеюсь, что это может помочь вам.

package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object ProcessingList {
  val spark = SparkSession
    .builder()
    .appName("ProcessingList")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ProcessingList") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val sqlContext = spark.sqlContext

  val input = "/home/cloudera/files/tests/list_processing.json"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      import org.apache.spark.sql.functions._

      val studentTest = sqlContext
        .read
        .json(input)

      studentTest
        .filter(col("grade").isNotNull)
        .select(col("grade"), col("score"), col("subject"))
        .cache()
        .createOrReplaceTempView("student_test")

      sqlContext
          .sql(
            """SELECT grade, score, subject
              |FROM student_test
              |ORDER BY score DESC
              |LIMIT 5
              |""".stripMargin)
          .show()

      // To have the opportunity to view the web console of Spark: http://localhost:4041/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}
+-----+-----+---------+
|grade|score|  subject|
+-----+-----+---------+
|    A|   90|     Math|
|    A|   82|  Science|
|    A|   80|  History|
|    A|   80|  English|
|    A|   80|Geography|
+-----+-----+---------+

С уважением.

0 голосов
/ 09 апреля 2020

Во-первых, комментарий mvasyliv хорош, на мой взгляд.

Чтобы изменить его, вы можете использовать

  • plain scala методов сбора, таких как .filter (), однако тебе не нужна искра для этого. См. Scala коллекций API о том, как использовать
  • . Вы можете написать UnaryTransformers, которые преобразуют определенные c столбцы и вставляют новый. Посмотрите на это простое средство удаления специальных символов в качестве примера. Обратите внимание на метод outputDataType и метод createTransformFun c, основанный на методе коллекции .map.

    class SpecialCharsRemover (override val uid: String)
    extends UnaryTransformer[Seq[String], Seq[String], SpecialCharsRemover]         with DefaultParamsWritable {
    
      def this() = this(Identi
    
    
    
    fiable.randomUID("tokenPermutationGenerato
    
    r"))
    
    override protected def createTransformFunc: Seq[String] => Seq[String] =     (tokensWithSpecialChars: Seq[String]) => {
    
    tokensWithSpecialChars.map(token => {
          removeSpecialCharsImpl(token)
        })
      }
    
      private def removeSpecialCharsImpl(token: String): String =  {
        if(token.equals("")) {
          return
    
     ""
    }
    
    //remove sonderzeichen
    var tempToken = token;
    
    tempToken = tempToken.replace(",", "")
    tempToken = tempToken.replace("'", "")
    tempToken = tempToken.replace("'", "")
    tempToken = tempToken.replace("_", "")
    tempToken = tempToken.replace("-", "")
    tempToken = tempToken.replace("!", "")
    tempToken = tempToken.replace(".", "")
    tempToken = tempToken.replace("?", "")
    tempToken = tempToken.replace(":", "")
    tempToken = tempToken.replace(")", "")
    tempToken = tempToken.replace("(", "")
    tempToken = tempToken.replace(",", "")
    tempToken = tempToken.replace("‘", "")
    tempToken = tempToken.replace("}", "")
    tempToken = tempToken.replace("{", "")
    tempToken = tempToken.replace("[", "")
    tempToken = tempToken.replace("]", "")
    tempToken = tempToken.replace("]", "")
    tempToken = tempToken.replace("®", "")
    
    tempToken = ThesaurusUtils.stemToken(tempToken);
    
    tempToken
    }
    
    override protected def outputDataType: DataType = new ArrayType(StringType,     false)
    }
    

Или вы можете зарегистрировать произвольную функцию в виде UDF (Java Code) :

  ds.sparkSession().sqlContext().udf().register("THE_BOB", (UDF1<String, String>) this::getSomeBob, DataTypes.StringType); 

 private String getSomeBob(String text) {
        return "bob";
    }

затем позвоните по этому номеру:

   bobColumn = functions.callUDF("THE_BOB", bobColumn);
...