pyspark Потоковая агрегация на основе меньших - PullRequest
0 голосов
/ 22 апреля 2020

У меня есть потоковый фрейм данных, приходящий с топика Кафки c. Я использую поле timestamp для подсчета записей за 5 минут windows, как показано ниже:

.groupBy(window("timestamp","5 minutes"), "timestamp") \
.count()

Теперь на отдельном шаге я хочу использовать эти 5 минут windows для вычисления ежедневного счета , Если я попытаюсь сделать это на том же шаге, я получу исключение

Несколько потоковых агрегатов не поддерживаются потоковыми наборами данных / наборами данных

Для решения этой проблемы я пишу 5-минутный windows фрейм данных для другого Kafka topi c выглядит следующим образом:

.writeStream \
            .outputMode("update") \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("topic", "windowed-counts") \
            .start()

и я использую другой потоковый фрейм данных, чтобы прочитать этот поток и выполнить ежедневную агрегацию на основе 5 минут, суммируя счетчик windows выглядит следующим образом:

.withColumn("timestamp_day", to_date(col("timestamp"))) \
.groupBy(window("timestamp_day","1 day")) \
.agg(sum("count"))

Теперь проблема заключается в том, что оба режима вывода update и complete, которые публикуются в Kafka, представляют собой обновленные 5 минут windows счетчики. Это означает, что если для пакета у меня есть 4 записи в моем 5-минутном окне x:

{"window":"x","timestamp":"y","count":4}

И в следующей партии у меня есть еще один добавленный , следующее написано в Кафке сток:

{"window":"x","timestamp":"y","count":5}

Поэтому ежедневная агрегация насчитывает 9, а не 5.

Я думаю, я мог бы снова сгруппировать и получить максимальное значение поля count, а затем суммировать, используя эти максимальные значения. Однако для этого потребуется использовать еще одну промежуточную Kafka topi c, поскольку при попытке сделать это я получаю ту же ошибку

Несколько потоковых агрегатов не поддерживаются потоковыми наборами данных / наборами данных

...