Concat значения из столбца и сделать еще один столбец - PullRequest
0 голосов
/ 27 ноября 2018

Я работаю с Spark SQL и выполняю некоторые операции SQL над таблицей Hive.Моя таблица выглядит следующим образом: `` `

ID COST CODE
1  100  AB1
5  200  BC3
1  400  FD3
6  600  HJ2
1  900  432
3  800  DS2
2  500  JT4 

` ``

Я хочу создать из этого еще одну таблицу, которая будет иметь общую стоимость и топ-5 кодов в цепочкев другом столбце, как это.

`` `

ID  TOTAL_COST  CODE  CODE_CHAIN
1   1400        432   432, FD3, AB1

` ``

Общая стоимость проста, но, как объединить значения из столбца CODE и формыдругой столбец.

Я пробовал функцию collect_set, но значения не могут быть ограничены, а также неправильно отсортированы, возможно, из-за распределенной обработки.

Возможна любая логика SQL?

РЕДАКТИРОВАТЬ:

Мне нужно отсортировать данные, поэтому я получаю 5 лучших значений.

Ответы [ 2 ]

0 голосов
/ 28 ноября 2018

Используйте оконную функцию и таблицу () для фильтрации по первому номеру строки.Проверьте это:

scala> val df = Seq((1,100,"AB1"),(5,200,"BC3"),(1,400,"FD3"),(6,600,"HJ2"),(1,900,"432"),(3,800,"DS2"),(2,500,"JT4")).toDF("ID","COST","CODE")
df: org.apache.spark.sql.DataFrame = [ID: int, COST: int ... 1 more field]

scala> df.show()
+---+----+----+
| ID|COST|CODE|
+---+----+----+
|  1| 100| AB1|
|  5| 200| BC3|
|  1| 400| FD3|
|  6| 600| HJ2|
|  1| 900| 432|
|  3| 800| DS2|
|  2| 500| JT4|
+---+----+----+


scala> df.createOrReplaceTempView("course")

scala> spark.sql(""" with tab1(select id,cost,code,collect_list(code) over(partition by id order by cost desc rows between current row and 5 following ) cc, row_number() over(partition by id order by cost desc) rc,sum(cost) over(partition by id order by cost desc rows between current row and 5 following) total from course) select id, total, cc from tab1 where rc=1 """).show(false)
+---+-----+---------------+
|id |total|cc             |
+---+-----+---------------+
|1  |1400 |[432, FD3, AB1]|
|6  |600  |[HJ2]          |
|3  |800  |[DS2]          |
|5  |200  |[BC3]          |
|2  |500  |[JT4]          |
+---+-----+---------------+


scala>
0 голосов
/ 27 ноября 2018

Используйте slice, sort_array и collect_list

import org.apache.spark.sql.functions._

df
  .groupBy("id")
  .agg(
    sum("cost") as "total_cost", 
    slice(sort_array(collect_list(struct($"cost", $"code")), false), 1, 5)("code") as "codes")

В Spark 2.3 вам придется заменить slice на ручную индексацию отсортированного массива

val sorted = sort_array(collect_list(struct($"cost", $"code")), false)("code")
val codes = array((0 until 5).map(i => sorted.getItem(i)): _*) as "codes"
...