Я должен обработать некоторые файлы, которые приходят ко мне ежедневно.Информация имеет первичный ключ (date,client_id,operation_id)
.Поэтому я создал поток, который добавляет только новые данные в дельта-таблицу:
operations\
.repartition('date')\
.writeStream\
.outputMode('append')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/operations')
Это работает нормально, но мне нужно суммировать эту информацию, сгруппированную по (date,client_id)
, поэтому я создал другой поток из этих операцийtable to new table:
summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')
summarized= summarized.groupBy('client_id','date').agg(<a lot of aggs>)
summarized.repartition('date')\
.writeStream\
.outputMode('complete')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/summarized')
Это работает, но каждый раз, когда я получаю новые данные в таблицу operations
, spark пересчитывает summarized
снова и снова.Я пытался использовать режим добавления во второй потоковой передаче, но для этого нужны водяные знаки, а дата - DateType.
Существует способ расчета только новых агрегатов на основе ключей группы и добавления их к * 1013.* * * 1014