В зависимости от того, как вы хотите агрегировать сгруппированные данные - вы можете сделать, например,
Предпосылки (если вы их еще не установили):
from pyspark.sql import functions as F
from pyspark.sql.functions import *
Для sum
:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))
Для max
:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))
А затем:
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
Дело в том - если вы делаете groupBy
, вам также необходимо уменьшить путем агрегирования. Если вы хотите отсортировать значения вместе, попробуйте df.sort(...)