В pyspark нет melt
или pivot
API, которые бы выполняли sh напрямую. Вместо этого выведите плоскость из RDD в новый фрейм данных и агрегируйте:
df.show()
+---+---+---+
|123|124|125|
+---+---+---+
| 1| 2| 3|
| 9| 9| 4|
| 4| 12| 1|
| 2| 4| 8|
| 7| 6| 3|
| 19| 11| 2|
| 21| 10| 10|
+---+---+---+
Для каждого столбца или каждой строки в RDD выведите строку с двумя столбцами: значением столбца и именем столбца:
cols = df.columns
(df.rdd
.flatMap(lambda row: [(row[c], c) for c in cols]).toDF(["value", "column_name"])
.show())
+-----+-----------+
|value|column_name|
+-----+-----------+
| 1| 123|
| 2| 124|
| 3| 125|
| 9| 123|
| 9| 124|
| 4| 125|
| 4| 123|
| 12| 124|
| 1| 125|
| 2| 123|
| 4| 124|
| 8| 125|
| 7| 123|
| 6| 124|
| 3| 125|
| 19| 123|
| 11| 124|
| 2| 125|
| 21| 123|
| 10| 124|
+-----+-----------+
Затем сгруппируйте по значению и объедините имена столбцов в список:
from pyspark.sql import functions as f
(df.rdd
.flatMap(lambda row: [(row[c], c) for c in cols]).toDF(["value", "column_name"])
.groupby("value").agg(f.collect_list("column_name"))
.show())
+-----+-------------------------+
|value|collect_list(column_name)|
+-----+-------------------------+
| 19| [123]|
| 7| [123]|
| 6| [124]|
| 9| [123, 124]|
| 1| [123, 125]|
| 10| [124, 125]|
| 3| [125, 125]|
| 12| [124]|
| 8| [125]|
| 11| [124]|
| 2| [124, 123, 125]|
| 4| [125, 123, 124]|
| 21| [123]|
+-----+-------------------------+