Pivot
- дорогостоящая операция shuffle
, и ее следует избегать , если возможно . Попробуйте использовать этот logi c с arrays_zip and explode
до динамически сворачивать столбцы и groupby-aggregate
.
from pyspark.sql import functions as F
df.withColumn("cols", F.explode(F.arrays_zip(F.array([F.array(F.col(x),F.lit(x))\
for x in df.columns if x!='id']))))\
.withColumn("name", F.col("cols.0")[1]).withColumn("val", F.col("cols.0")[0]).drop("cols")\
.groupBy("name").agg(F.count(F.when(F.col("val")=='diff',1)).alias("diff"),\
F.count(F.when(F.col("val")=='same',1)).alias("same")).orderBy("name").show()
#+----+----+----+
#|name|diff|same|
#+----+----+----+
#| c1| 2| 2|
#| c2| 0| 4|
#| c3| 1| 3|
#+----+----+----+
Вы также можете сделайте это с помощью exploding a map_type
, создав map dynamically
.
from pyspark.sql import functions as F
from itertools import chain
df.withColumn("cols", F.create_map(*(chain(*[(F.lit(name), F.col(name))\
for name in df.columns if name!='id']))))\
.select(F.explode("cols").alias("name","val"))\
.groupBy("name").agg(F.count(F.when(F.col("val")=='diff',1)).alias("diff"),\
F.count(F.when(F.col("val")=='same',1)).alias("same")).orderBy("name").show()
#+----+----+----+
#|name|diff|same|
#+----+----+----+
#| c1| 2| 2|
#| c2| 0| 4|
#| c3| 1| 3|
#+----+----+----+