Да, в Spark 2.4.4 (последняя на данный момент) НЕ поддерживается пока несколько потоковых агрегатов. Но в качестве обходного пути вы можете использовать метод .foreachBatch()
:
def foreach_batch_function(df, epoch_id):
df.groupBy("bag_id","color")
.agg(count("color").as("color_count"))
.groupBy("bag_id").agg(max("color_count"))
.show() # .show() is a dummy action
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
В .foreachBatch()
df не является потоковым df, поэтому вы можете делать все, что хотите.