Это два возможных решения.
Пример входных данных:
import spark.implicits._
val df = Seq(
("ABCD",1),
("XYZA",1),
("GFFD",1),
("NAAS",1),
("ABCD",2),
("XYZA",2),
("NAAS",2),
("VDDE",2),
("EXAMPLE", 20)
).toDF("name", "target")
df.show()
+-------+------+
| name|target|
+-------+------+
| ABCD| 1|
| XYZA| 1|
| GFFD| 1|
| NAAS| 1|
| ABCD| 2|
| XYZA| 2|
| NAAS| 2|
| VDDE| 2|
|EXAMPLE| 20|
+-------+------+
1 - возврат только ненулевых вхождений с использованием map
.
case class DataItem(name: String, target: Int)
df.as[DataItem]
.groupByKey(_.name)
.mapGroups{
case (nameKey, targetIter) =>{
val targetList = targetIter.map(_.target).toSeq
val occMap = targetList.groupBy(a=>a).mapValues(_.size)
(nameKey, occMap)
}
}
.toDF("name", "target_count").show()
+-------+----------------+
| name| target_count|
+-------+----------------+
| VDDE| [2 -> 1]|
| NAAS|[2 -> 1, 1 -> 1]|
|EXAMPLE| [20 -> 1]|
| GFFD| [1 -> 1]|
| XYZA|[2 -> 1, 1 -> 1]|
| ABCD|[2 -> 1, 1 -> 1]|
+-------+----------------+
2 - Использование списка для отображения вхождений (включая 0), где индекс = target_value.
case class DataItem(name: String, target: Int)
df.as[DataItem]
.groupByKey(_.name)
.mapGroups{
case (nameKey, targetIter) =>{
val targetList = targetIter.map(_.target).toSeq
val occMap = targetList.groupBy(a=>a).mapValues(_.size)
val maxTarget = occMap.maxBy(_._2)._1
val occList = for (i <- 1 until maxTarget+1) yield occMap.getOrElse(i, 0)
(nameKey, occList)
}
}
.toDF("name", "target_count").show(20, false)
+-------+------------------------------------------------------------+
|name |target_count |
+-------+------------------------------------------------------------+
|VDDE |[0, 1] |
|NAAS |[1, 1] |
|EXAMPLE|[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]|
|GFFD |[1] |
|XYZA |[1, 1] |
|ABCD |[1, 1] |
+-------+------------------------------------------------------------+