Проверьте это решение для scala.
scala> val df = Seq(("s1",0,1,0,1,1,0,1),
| ("s2",1,0,0,0,0,0,0),
| ("s3",1,1,0,0,0,0,0),
| ("s4",0,1,0,1,1,0,0)).toDF("key","A","B","C","D","E","F","G")
df: org.apache.spark.sql.DataFrame = [key: string, A: int ... 6 more fields]
scala> df.show
+---+---+---+---+---+---+---+---+
|key| A| B| C| D| E| F| G|
+---+---+---+---+---+---+---+---+
| s1| 0| 1| 0| 1| 1| 0| 1|
| s2| 1| 0| 0| 0| 0| 0| 0|
| s3| 1| 1| 0| 0| 0| 0| 0|
| s4| 0| 1| 0| 1| 1| 0| 0|
+---+---+---+---+---+---+---+---+
scala> val columns = df.columns.filter(x=>x != "key")
columns: Array[String] = Array(A, B, C, D, E, F, G)
scala> val p1 = columns.map( x => when(col(x)===lit(1),x+",").otherwise(lit(""))).reduce(concat(_,_)).as("panel")
p1: org.apache.spark.sql.Column = concat(concat(concat(concat(concat(concat(CASE WHEN (A = 1) THEN A, ELSE END, CASE WHEN (B = 1) THEN B, ELSE END), CASE WHEN (C = 1) THEN C, ELSE END), CASE WHEN (D = 1) THEN D, ELSE END), CASE WHEN (E = 1) THEN E, ELSE END), CASE WHEN (F = 1) THEN F, ELSE END), CASE WHEN (G = 1) THEN G, ELSE END) AS `panel`
scala> df.select(p1).show(false)
+--------+
|panel |
+--------+
|B,D,E,G,|
|A, |
|A,B, |
|B,D,E, |
+--------+
Со всеми столбцами,
scala> df.select(col("*"), p1).show
+---+---+---+---+---+---+---+---+--------+
|key| A| B| C| D| E| F| G| panel|
+---+---+---+---+---+---+---+---+--------+
| s1| 0| 1| 0| 1| 1| 0| 1|B,D,E,G,|
| s2| 1| 0| 0| 0| 0| 0| 0| A,|
| s3| 1| 1| 0| 0| 0| 0| 0| A,B,|
| s4| 0| 1| 0| 1| 1| 0| 0| B,D,E,|
+---+---+---+---+---+---+---+---+--------+
В результате есть запятая в конце.Это можно удалить с помощью
scala> df.select(col("*"), regexp_replace(p1,",$","").as("panel")).show
+---+---+---+---+---+---+---+---+-------+
|key| A| B| C| D| E| F| G| panel|
+---+---+---+---+---+---+---+---+-------+
| s1| 0| 1| 0| 1| 1| 0| 1|B,D,E,G|
| s2| 1| 0| 0| 0| 0| 0| 0| A|
| s3| 1| 1| 0| 0| 0| 0| 0| A,B|
| s4| 0| 1| 0| 1| 1| 0| 0| B,D,E|
+---+---+---+---+---+---+---+---+-------+
scala>
EDIT2:
A more cleaner approach would be to use just array() function with concat_ws
scala> val df = Seq(("s1",0,1,0,1,1,0,1),("s2",1,0,0,0,0,0,0),("s3",1,1,0,0,0,0,0),("s4",0,1,0,1,1,0,0)).toDF("key","A","B","C","D","E","F","G")
df: org.apache.spark.sql.DataFrame = [key: string, A: int ... 6 more fields]
scala> df.show(false)
+---+---+---+---+---+---+---+---+
|key|A |B |C |D |E |F |G |
+---+---+---+---+---+---+---+---+
|s1 |0 |1 |0 |1 |1 |0 |1 |
|s2 |1 |0 |0 |0 |0 |0 |0 |
|s3 |1 |1 |0 |0 |0 |0 |0 |
|s4 |0 |1 |0 |1 |1 |0 |0 |
+---+---+---+---+---+---+---+---+
scala> val p1 = columns.map( x => when(col(x)===lit(1),x).otherwise(null))
p1: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (A = 1) THEN A ELSE NULL END, CASE WHEN (B = 1) THEN B ELSE NULL END, CASE WHEN (C = 1) THEN C ELSE NULL END, CASE WHEN (D = 1) THEN D ELSE NULL END, CASE WHEN (E = 1) THEN E ELSE NULL END, CASE WHEN (F = 1) THEN F ELSE NULL END, CASE WHEN (G = 1) THEN G ELSE NULL END)
scala> df.select(col("*"),array(p1:_*).alias("panel")).withColumn("panel2",concat_ws(",",'panel)).show(false)
+---+---+---+---+---+---+---+---+----------------+-------+
|key|A |B |C |D |E |F |G |panel |panel2 |
+---+---+---+---+---+---+---+---+----------------+-------+
|s1 |0 |1 |0 |1 |1 |0 |1 |[, B,, D, E,, G]|B,D,E,G|
|s2 |1 |0 |0 |0 |0 |0 |0 |[A,,,,,,] |A |
|s3 |1 |1 |0 |0 |0 |0 |0 |[A, B,,,,,] |A,B |
|s4 |0 |1 |0 |1 |1 |0 |0 |[, B,, D, E,,] |B,D,E |
+---+---+---+---+---+---+---+---+----------------+-------+
scala>