Общая обработка полей ListType, MapType, StructType фрейма данных Spark в Scala в UDF? - PullRequest
0 голосов
/ 10 июля 2020

Как выполнить общую обработку Spark StructType в Scala, такую ​​как выбор поля по имени, перебор поля карты / списка и т. Д. c?

В фрейме данных Spark у меня есть «экземпляры» столбца Введите «ArrayType» со следующей схемой:

instances[ArrayType]:
    0 [ StructType:
            name [StringType]
            address[StringType]
            experiences[MapType]:
                Company-1[StringType]:
                    StructType:
                        numYears[IntType]: 5
                        grade[IntType]
                Company-2[StringType]:
                    StructType:
                        numYears[IntType]:  12
                        grade[IntType]]
     1 [ StructType:
            name [StringType]
            address[StringType]
            experiences[MapType]:
                Company-1[StringType]:
                    StructType:
                        numYears[IntType]: 3
                        grade[IntType]
                Company-2[StringType]:
                    StructType:
                        numYears[IntType]:  9
                        grade[IntType]]

Мне нужно преобразовать «экземпляры» этого столбца ArrayType в столбец «totalExperience» типа

derived column "totalExperience" of type "MapType"[StringType -> IntType]
company-1: 8
company-2: 21

Примечание: (5 + 3 = 8 и 12 + 9 = 21)

Эквивалентный псевдокод для этого:

totalExperience = Map<String, Int>();
for (instance in instances) {
    for ((currentExperience, numYears) in instance.getExperiences().entries()) {
         if (!totalExperience.contains(currentExperience)) {
              totalExperience.put(currentExperience, 0);
         }

         totalExperience.put(currentExperience, totalExperience.get(currentExperience) + numYears);
    }
}

return totalExperience

Я написал для этого UDF следующим образом, но я не нашел никакого способа реализовать вышеупомянутый псевдокод. код в Scala -spark:

  private val computeTotalExperience = udf(_ => MapType = (instances: ArrayType) => {
    val totalExperienceByCompany = DataTypes.createMapType(StringType, LongType)

    **How to iterate over "instances" with type as "ArrayType" ?**
    for (instance <- instances) {
      **How to access and iterate over "experiences" mapType field on instance ???**
      // Populate totalExperienceByCompany(MapType) with key as "company-1" name

    }

    delayReasons
  })

Как выполнить приведенную выше общую обработку полей ListType, MapType, StructType фрейма данных Spark в Scala в UDF?

1 Ответ

2 голосов
/ 10 июля 2020

Проверьте код ниже.

scala> df.printSchema
root
 |-- instances: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- experiences: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |    |-- numYears: integer (nullable = true)
 |    |    |    |    |-- grade: string (nullable = true)
 |    |    |-- name: string (nullable = true)
scala> df.show(false)
+-----------------------------------------------------------------------------------------------------------------------------------+
|instances                                                                                                                          |
+-----------------------------------------------------------------------------------------------------------------------------------+
|[[address_0, [Company-1 -> [5, 1], Company-2 -> [12, 1]], name_0], [address_1, [Company-1 -> [3, 1], Company-2 -> [9, 1]], name_1]]|
+-----------------------------------------------------------------------------------------------------------------------------------+                                                                                                                                                                                                   
scala> 
val expr = array(
    struct(lit("company-1").as("company"),$"instance.experiences.Company-1.numYears"),
    struct(lit("company-2").as("company"),$"instance.experiences.Company-2.numYears")
)
       
scala>  

df
.withColumn("instance",explode($"instances"))
.withColumn("company",explode(expr))
.select("company.*")
.groupBy($"company")
.agg(sum($"numYears").as("numYears"))
.select(map($"company",$"numYears").as("totalExperience"))
.show(false) 
                                                                                                                                                       
+-----------------+                                                                                                                                                                                
|totalExperience  |                                                                                                                                                                                
+-----------------+                                                                                                                                                                                
|[company-1 -> 8] |                                                                                                                                                                                
|[company-2 -> 21]|                                                                                                                                                                                
+-----------------+                                                                                                                                                                                
                     
...