Spark DataFrame агрегирует значения - PullRequest
0 голосов
/ 31 октября 2019

Ввод данных кадра

+-----------------+-------+
|Id               | value |
+-----------------+-------+
|             1622| 139685|
|             1622| 182118|
|             1622| 127955|
|             3837|3224815|
|             1622| 727761|
|             1622| 155875|
|             3837|1504923|
|             1622| 139684|
|             1453| 536111|
+-----------------+-------+

Вывод:

    +-----------------+--------------------------------------------+
    |Id               | value                                      |
    +-----------------+--------------------------------------------+
    |             1622|[139685,182118,127955,727761,155875,139684] |
    |             1453| 536111                                     |
    |             3837|[3224815,1504923]                           |
    +-----------------+--------------------------------------------+


Когда конкретный id имеет более одного значения, он должен быть собран в формате array, иначе следует учитывать, чтокак одно значение without brace []

Я пытался использовать приведенное ниже решение для связи, но не смог обработать условие if-else во фрейме данных.

ссылка: Spark DataFrame агрегированные значения столбца по ключу в список

1 Ответ

2 голосов
/ 31 октября 2019

используйте оконную функцию

scala> import org.apache.spark.sql.expressions.Window
scala> var df = Seq((1622, 139685),(1622, 182118),(1622, 127955),(3837,3224815),(1622, 727761),(1622, 155875),(3837,1504923),(1622, 139684),(1453, 536111)).toDF("id","value")

scala> df.show()
+----+-------+
|  id|  value|
+----+-------+
|1622| 139685|
|1622| 182118|
|1622| 127955|
|3837|3224815|
|1622| 727761|
|1622| 155875|
|3837|1504923|
|1622| 139684|
|1453| 536111|
+----+-------+

scala> var df1= df.withColumn("r",count($"id").over(Window.partitionBy("id").orderBy("id")).cast("int"))

scala> df1.show()
+----+-------+---+
|  id|  value|  r|
+----+-------+---+
|1453| 536111|  1|
|1622| 139685|  6|
|1622| 182118|  6|
|1622| 127955|  6|
|1622| 727761|  6|
|1622| 155875|  6|
|1622| 139684|  6|
|3837|3224815|  2|
|3837|1504923|  2|
+----+-------+---+
scala> var df2 =df1.selectExpr("*").filter('r ===1).drop("r").union(df1.filter('r =!= 1).groupBy("id").agg(collect_list($"value").cast("string").as("value")))


scala> df2.show(false)
+----+------------------------------------------------+
|id  |value                                           |
+----+------------------------------------------------+
|1453|536111                                          |
|1622|[139685, 182118, 127955, 727761, 155875, 139684]|
|3837|[3224815, 1504923]                              |
+----+------------------------------------------------+
scala> df2.printSchema
root
 |-- id: integer (nullable = false)
 |-- value: string (nullable = true)

дайте мне знать, если у вас есть какие-либо вопросы, связанные с тем же.

...