Вот как бы вы это сделали
scala> val df = spark.read.json("/tmp/stack/pathi.json")
df: org.apache.spark.sql.DataFrame = [result: struct<data: array<struct<col1:string,col2:string,col3:struct<abc:bigint,aeio:bigint,aero:bigint,ddc:bigint,def:bigint,dyno:bigint>,col4:string>>, total: bigint>, success: boolean]
scala> df.printSchema
root
|-- result: struct (nullable = true)
| |-- data: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- col1: string (nullable = true)
| | | |-- col2: string (nullable = true)
| | | |-- col3: struct (nullable = true)
| | | | |-- abc: long (nullable = true)
| | | | |-- aeio: long (nullable = true)
| | | | |-- aero: long (nullable = true)
| | | | |-- ddc: long (nullable = true)
| | | | |-- def: long (nullable = true)
| | | | |-- dyno: long (nullable = true)
| | | |-- col4: string (nullable = true)
| |-- total: long (nullable = true)
|-- success: boolean (nullable = true)
scala> df.show(false)
+-------------------------------------------------------------------------------------------------------------------------------+-------+
|result |success|
+-------------------------------------------------------------------------------------------------------------------------------+-------+
|[[[value1, value2, [, 5,,,, 3], value4], [value11, value22, [6,, 2,, 9,], value44], [value12, value23, [,,, 6,,], value43]], 3]|true |
+-------------------------------------------------------------------------------------------------------------------------------+-------+
scala> df.select(explode($"result.data")).show(false)
+-----------------------------------------+
|col |
+-----------------------------------------+
|[value1, value2, [, 5,,,, 3], value4] |
|[value11, value22, [6,, 2,, 9,], value44]|
|[value12, value23, [,,, 6,,], value43] |
+-----------------------------------------+
Посмотрев на схему, теперь мы знаем список возможных столбцов внутри "col3", поэтому мы можем вычислить минимум всех этих значений путем жесткого кодирования, как показано ниже
scala> df.select(explode($"result.data")).select(least($"col.col3.abc",$"col.col3.aeio",$"col.col3.aero",$"col.col3.ddc",$"col.col3.def",$"col.col3.dyno")).show(false)
+--------------------------------------------------------------------------------------------+
|least(col.col3.abc, col.col3.aeio, col.col3.aero, col.col3.ddc, col.col3.def, col.col3.dyno)|
+--------------------------------------------------------------------------------------------+
|3 |
|2 |
|6 |
+--------------------------------------------------------------------------------------------+
Динамическая обработка:
Я предполагаю, что до col.col3 структура остается прежней, поэтому мы приступим к созданию другого фрейма данных, как
scala> val df2 = df.withColumn("res_data",explode($"result.data")).select(col("success"),col("res_data"),$"res_data.col3.*")
df2: org.apache.spark.sql.DataFrame = [success: boolean, res_data: struct<col1: string, col2: string ... 2 more fields> ... 6 more fields]
scala> df2.show(false)
+-------+-----------------------------------------+----+----+----+----+----+----+
|success|res_data |abc |aeio|aero|ddc |def |dyno|
+-------+-----------------------------------------+----+----+----+----+----+----+
|true |[value1, value2, [, 5,,,, 3], value4] |null|5 |null|null|null|3 |
|true |[value11, value22, [6,, 2,, 9,], value44]|6 |null|2 |null|9 |null|
|true |[value12, value23, [,,, 6,,], value43] |null|null|null|6 |null|null|
+-------+-----------------------------------------+----+----+----+----+----+----+
Кроме "success" и "res_data", остальные столбцы являются динамическими из "col3"
scala> val p = df2.columns
p: Array[String] = Array(success, res_data, abc, aeio, aero, ddc, def, dyno)
Отфильтруйте эти два и сопоставьте остальные из них с искровыми колоннами
scala> val m = p.filter(_!="success").filter(_!="res_data").map(col(_))
m: Array[org.apache.spark.sql.Column] = Array(abc, aeio, aero, ddc, def, dyno)
Теперь передайте m:_*
в качестве аргумента наименьшей функции, и вы получите результаты, как показано ниже
scala> df2.withColumn("minv",least(m:_*)).show(false)
+-------+-----------------------------------------+----+----+----+----+----+----+----+
|success|res_data |abc |aeio|aero|ddc |def |dyno|minv|
+-------+-----------------------------------------+----+----+----+----+----+----+----+
|true |[value1, value2, [, 5,,,, 3], value4] |null|5 |null|null|null|3 |3 |
|true |[value11, value22, [6,, 2,, 9,], value44]|6 |null|2 |null|9 |null|2 |
|true |[value12, value23, [,,, 6,,], value43] |null|null|null|6 |null|null|6 |
+-------+-----------------------------------------+----+----+----+----+----+----+----+
scala>
Надеюсь, это поможет.