минимальное значение столбца типа структуры в кадре данных - PullRequest
0 голосов
/ 16 января 2019

У меня вопрос ниже файла json, где он содержит данные типа структуры для column3. Я могу извлечь строки, но не могу найти минимальное значение column3. Где column3 содержит динамические вложенные столбцы (динамические имена) со значениями.

входные данные:

"result": { "data" : 
[ {"col1": "value1",  "col2": "value2",  "col3" : { "dyno" : 3, "aeio": 5 }, "col4": "value4"},
   {"col1": "value11", "col2": "value22", "col3" : { "abc" : 6, "def": 9 , "aero": 2}, "col4": "value44"},
   {"col1": "value12", "col2": "value23", "col3" : { "ddc" : 6}, "col4": "value43"}]  

outputDate, ожидаемое как:

col1    col2    col3    col4    col5(min value of col3)

value1  value2  [3,5]   value4  3

value11 value22 [6,9,2] value44 2

value12 value23 [6] value43 6

Я могу прочитать файл и разобрать данные как записи, но не могу найти минимальное значение col3.

val bestseller_df1 = bestseller_json.withColumn("extractedresult", explode(col("result.data")))

Не могли бы вы помочь мне написать код, чтобы найти минимальное значение col3 в spark / scala.

мой файл json:

{"success":true, "result": { "data": [ {"col1": "value1",  "col2": "value2",  "col3" : { "dyno" : 3, "aeio": 5 }, "col4": "value4"},{"col1": "value11", "col2": "value22", "col3" : { "abc" : 6, "def": 9 , "aero": 2}, "col4": "value44"},{"col1": "value12", "col2": "value23", "col3" : { "ddc" : 6}, "col4": "value43"}],"total":3}}

Ответы [ 2 ]

0 голосов
/ 17 января 2019

dbutils.fs.put ("/ tmp / test.json", "" "

{"col1": "value1", "col2": "value2", "col3": {"dyno": 3, "aeio": 5}, "col4": "value4"},

{"col1": "value11", "col2": "value22", "col3": {"abc": 6, "def": 9, "aero": 2}, "col4": "value44 «},

{"col1": "value12", "col2": "value23", "col3": {"ddc": 6}, "col4": "value43"}]} "" ", верно)

val df_json = spark.read.json ("/ tmp / test.json")

val tf = df_json.withColumn ("col3", взорваться (массив ($ "col3. *"))). ToDF

val tmp_group = tf.groupBy ("col1"). Agg (min (tf.col ("col3")). Alias ​​("col3"))

val top_rows = tf.join (tmp_group, Seq ("col3", "col1"), "inner")

top_rows.select ("col1", "col2", "col3", "col4"). Show ()

Написал 282 байта.

+ ------- + ------- + ---- + ------- +

| col1 | col2 | col3 | COL4 |

+ ------- + ------- + ---- + ------- +

| value1 | значение2 | 3 | value4 |

| value11 | value22 | 2 | value44 |

| value12 | value23 | 6 | value43 |

+ ------- + ------- + ---- + ------- +

0 голосов
/ 16 января 2019

Вот как бы вы это сделали

scala> val df = spark.read.json("/tmp/stack/pathi.json")
df: org.apache.spark.sql.DataFrame = [result: struct<data: array<struct<col1:string,col2:string,col3:struct<abc:bigint,aeio:bigint,aero:bigint,ddc:bigint,def:bigint,dyno:bigint>,col4:string>>, total: bigint>, success: boolean]

scala> df.printSchema
root
 |-- result: struct (nullable = true)
 |    |-- data: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- col1: string (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |    |    |    |-- col3: struct (nullable = true)
 |    |    |    |    |-- abc: long (nullable = true)
 |    |    |    |    |-- aeio: long (nullable = true)
 |    |    |    |    |-- aero: long (nullable = true)
 |    |    |    |    |-- ddc: long (nullable = true)
 |    |    |    |    |-- def: long (nullable = true)
 |    |    |    |    |-- dyno: long (nullable = true)
 |    |    |    |-- col4: string (nullable = true)
 |    |-- total: long (nullable = true)
 |-- success: boolean (nullable = true)


scala> df.show(false)
+-------------------------------------------------------------------------------------------------------------------------------+-------+
|result                                                                                                                         |success|
+-------------------------------------------------------------------------------------------------------------------------------+-------+
|[[[value1, value2, [, 5,,,, 3], value4], [value11, value22, [6,, 2,, 9,], value44], [value12, value23, [,,, 6,,], value43]], 3]|true   |
+-------------------------------------------------------------------------------------------------------------------------------+-------+

scala> df.select(explode($"result.data")).show(false)
+-----------------------------------------+
|col                                      |
+-----------------------------------------+
|[value1, value2, [, 5,,,, 3], value4]    |
|[value11, value22, [6,, 2,, 9,], value44]|
|[value12, value23, [,,, 6,,], value43]   |
+-----------------------------------------+

Посмотрев на схему, теперь мы знаем список возможных столбцов внутри "col3", поэтому мы можем вычислить минимум всех этих значений путем жесткого кодирования, как показано ниже

scala> df.select(explode($"result.data")).select(least($"col.col3.abc",$"col.col3.aeio",$"col.col3.aero",$"col.col3.ddc",$"col.col3.def",$"col.col3.dyno")).show(false)
+--------------------------------------------------------------------------------------------+
|least(col.col3.abc, col.col3.aeio, col.col3.aero, col.col3.ddc, col.col3.def, col.col3.dyno)|
+--------------------------------------------------------------------------------------------+
|3                                                                                           |
|2                                                                                           |
|6                                                                                           |
+--------------------------------------------------------------------------------------------+

Динамическая обработка:

Я предполагаю, что до col.col3 структура остается прежней, поэтому мы приступим к созданию другого фрейма данных, как

scala> val df2 = df.withColumn("res_data",explode($"result.data")).select(col("success"),col("res_data"),$"res_data.col3.*")
df2: org.apache.spark.sql.DataFrame = [success: boolean, res_data: struct<col1: string, col2: string ... 2 more fields> ... 6 more fields]

scala> df2.show(false)
+-------+-----------------------------------------+----+----+----+----+----+----+
|success|res_data                                 |abc |aeio|aero|ddc |def |dyno|
+-------+-----------------------------------------+----+----+----+----+----+----+
|true   |[value1, value2, [, 5,,,, 3], value4]    |null|5   |null|null|null|3   |
|true   |[value11, value22, [6,, 2,, 9,], value44]|6   |null|2   |null|9   |null|
|true   |[value12, value23, [,,, 6,,], value43]   |null|null|null|6   |null|null|
+-------+-----------------------------------------+----+----+----+----+----+----+

Кроме "success" и "res_data", остальные столбцы являются динамическими из "col3"

scala> val p = df2.columns
p: Array[String] = Array(success, res_data, abc, aeio, aero, ddc, def, dyno)

Отфильтруйте эти два и сопоставьте остальные из них с искровыми колоннами

scala> val m = p.filter(_!="success").filter(_!="res_data").map(col(_))
m: Array[org.apache.spark.sql.Column] = Array(abc, aeio, aero, ddc, def, dyno)

Теперь передайте m:_* в качестве аргумента наименьшей функции, и вы получите результаты, как показано ниже

scala> df2.withColumn("minv",least(m:_*)).show(false)
+-------+-----------------------------------------+----+----+----+----+----+----+----+
|success|res_data                                 |abc |aeio|aero|ddc |def |dyno|minv|
+-------+-----------------------------------------+----+----+----+----+----+----+----+
|true   |[value1, value2, [, 5,,,, 3], value4]    |null|5   |null|null|null|3   |3   |
|true   |[value11, value22, [6,, 2,, 9,], value44]|6   |null|2   |null|9   |null|2   |
|true   |[value12, value23, [,,, 6,,], value43]   |null|null|null|6   |null|null|6   |
+-------+-----------------------------------------+----+----+----+----+----+----+----+


scala>

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...