используйте оконную функцию
scala> import org.apache.spark.sql.expressions.Window
scala> var df = Seq((1622, 139685),(1622, 182118),(1622, 127955),(3837,3224815),(1622, 727761),(1622, 155875),(3837,1504923),(1622, 139684),(1453, 536111)).toDF("id","value")
scala> df.show()
+----+-------+
| id| value|
+----+-------+
|1622| 139685|
|1622| 182118|
|1622| 127955|
|3837|3224815|
|1622| 727761|
|1622| 155875|
|3837|1504923|
|1622| 139684|
|1453| 536111|
+----+-------+
scala> var df1= df.withColumn("r",count($"id").over(Window.partitionBy("id").orderBy("id")).cast("int"))
scala> df1.show()
+----+-------+---+
| id| value| r|
+----+-------+---+
|1453| 536111| 1|
|1622| 139685| 6|
|1622| 182118| 6|
|1622| 127955| 6|
|1622| 727761| 6|
|1622| 155875| 6|
|1622| 139684| 6|
|3837|3224815| 2|
|3837|1504923| 2|
+----+-------+---+
scala> var df2 =df1.selectExpr("*").filter('r ===1).drop("r").union(df1.filter('r =!= 1).groupBy("id").agg(collect_list($"value").cast("string").as("value")))
scala> df2.show(false)
+----+------------------------------------------------+
|id |value |
+----+------------------------------------------------+
|1453|536111 |
|1622|[139685, 182118, 127955, 727761, 155875, 139684]|
|3837|[3224815, 1504923] |
+----+------------------------------------------------+
scala> df2.printSchema
root
|-- id: integer (nullable = false)
|-- value: string (nullable = true)
дайте мне знать, если у вас есть какие-либо вопросы, связанные с тем же.