Я должен обработать некоторые файлы, которые приходят ко мне ежедневно.Информация имеет первичный ключ (date, client_id, operation_id).Поэтому я создал поток, который добавляет только новые данные в дельта-таблицу:
operations\
.repartition('date')\
.writeStream\
.outputMode('append')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/operations')
Это работает нормально, но мне нужно обобщить эту информацию, сгруппированную по (date, client_id), поэтому я создал другую потоковую передачу изэта таблица операций в новую таблицу.Поэтому я попытался преобразовать свое поле date
во временную метку, чтобы я мог использовать режим добавления при записи агрегированного потока:
import pyspark.sql.functions as F
summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')
summarized= summarized.withColumn('timestamp_date',F.to_timestamp('date'))
summarized= summarized.withWatermark('timestamp_date','1 second').groupBy('client_id','date','timestamp_date').agg(<lot of aggs>)
summarized\
.repartition('date')\
.writeStream\
.outputMode('append')\
.option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
.trigger(once=True)\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/summarized')
Этот код выполняется, но ничего не записывает в приемник.
почему он не записывает результаты в сток?