Добавлять только новые агрегаты на основе групповых ключей - PullRequest
0 голосов
/ 25 сентября 2019

Я должен обработать некоторые файлы, которые приходят ко мне ежедневно.Информация имеет первичный ключ (date,client_id,operation_id).Поэтому я создал поток, который добавляет только новые данные в дельта-таблицу:

operations\
        .repartition('date')\
        .writeStream\
        .outputMode('append')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/operations')

Это работает нормально, но мне нужно суммировать эту информацию, сгруппированную по (date,client_id), поэтому я создал другой поток из этих операцийtable to new table:

summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')

summarized= summarized.groupBy('client_id','date').agg(<a lot of aggs>)

summarized.repartition('date')\
        .writeStream\
        .outputMode('complete')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/summarized')

Это работает, но каждый раз, когда я получаю новые данные в таблицу operations, spark пересчитывает summarized снова и снова.Я пытался использовать режим добавления во второй потоковой передаче, но для этого нужны водяные знаки, а дата - DateType.

Существует способ расчета только новых агрегатов на основе ключей группы и добавления их к * 1013.* * * 1014

1 Ответ

1 голос
/ 25 сентября 2019

Вам необходимо использовать Spark Structured Streaming - Операции с окнами

Когда вы используете операции с окнами, он будет выполнять разбивку в соответствии с windowDuration и slideDuration.windowDuration говорит вам, какова длина окна, а slideDuration сообщает, сколько времени вы должны сдвинуть окно.

Если вы группируете, используя window () [документы] , вы получите результирующий столбец window вместе с другими столбцами, с которыми вы группируете, например client_id

Дляпример:

windowDuration = "10 minutes"
slideDuration = "5 minutes"
summarized = before_summary.groupBy(before_summary.client_id,
    window(before_summary.date, windowDuration, slideDuration)
).agg(<a lot of aggs>).orderBy('window')
...