У меня есть потоковый фрейм данных, приходящий с топика Кафки c. Я использую поле timestamp
для подсчета записей за 5 минут windows, как показано ниже:
.groupBy(window("timestamp","5 minutes"), "timestamp") \
.count()
Теперь на отдельном шаге я хочу использовать эти 5 минут windows для вычисления ежедневного счета , Если я попытаюсь сделать это на том же шаге, я получу исключение
Несколько потоковых агрегатов не поддерживаются потоковыми наборами данных / наборами данных
Для решения этой проблемы я пишу 5-минутный windows фрейм данных для другого Kafka topi c выглядит следующим образом:
.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "windowed-counts") \
.start()
и я использую другой потоковый фрейм данных, чтобы прочитать этот поток и выполнить ежедневную агрегацию на основе 5 минут, суммируя счетчик windows выглядит следующим образом:
.withColumn("timestamp_day", to_date(col("timestamp"))) \
.groupBy(window("timestamp_day","1 day")) \
.agg(sum("count"))
Теперь проблема заключается в том, что оба режима вывода update
и complete
, которые публикуются в Kafka, представляют собой обновленные 5 минут windows счетчики. Это означает, что если для пакета у меня есть 4 записи в моем 5-минутном окне x:
{"window":"x","timestamp":"y","count":4}
И в следующей партии у меня есть еще один добавленный , следующее написано в Кафке сток:
{"window":"x","timestamp":"y","count":5}
Поэтому ежедневная агрегация насчитывает 9, а не 5.
Я думаю, я мог бы снова сгруппировать и получить максимальное значение поля count
, а затем суммировать, используя эти максимальные значения. Однако для этого потребуется использовать еще одну промежуточную Kafka topi c, поскольку при попытке сделать это я получаю ту же ошибку
Несколько потоковых агрегатов не поддерживаются потоковыми наборами данных / наборами данных