Запрос для потокового набора данных в Spark - PullRequest
1 голос
/ 17 января 2020

У меня есть потоковый Набор данных со столбцами: bag_id, ball_color. Я хочу найти самый популярный цвет для каждой сумки. Итак, я попытался:

dataset.groupBy("bag_id", "color") # 1st aggregation
       .agg(count("color").as("color_count"))
       .groupBy("bag_id") # 2nd aggregation
       .agg(max("color_count"))

Но у меня была ошибка:

Исключение в потоке "главная" организация. apache .spark. sql .AnalysisException: Несколько потоковые агрегаты не поддерживаются потоковыми наборами данных / наборами данных ;;

Можно ли создать правильный запрос только с одной функцией агрегирования?

Ответы [ 2 ]

2 голосов
/ 19 января 2020

Да, в Spark 2.4.4 (последняя на данный момент) НЕ поддерживается пока несколько потоковых агрегатов. Но в качестве обходного пути вы можете использовать метод .foreachBatch() :

def foreach_batch_function(df, epoch_id):
  df.groupBy("bag_id","color")
  .agg(count("color").as("color_count"))
  .groupBy("bag_id").agg(max("color_count"))
  .show() # .show() is a dummy action

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()  

В .foreachBatch() df не является потоковым df, поэтому вы можете делать все, что хотите.

2 голосов
/ 18 января 2020

Существует открытая Jira, решающая эту проблему Spark-26655 , так как на данный момент мы не можем запустить несколько агрегатов для потоковых данных.

Один обходной путь - Выполнение one aggregation и сохранение обратно в Kafka..et c и снова считывание из kafka для выполнения другой агрегации.

(or)

Мы можем запустить только одну агрегацию для потоковых данных и сохранить ее в HDFS / Hive / HBase и извлечь для выполнения дополнительных агрегаций (это будет отдельная работа)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...