Используя Spark и Databricks, у меня есть (дельта) таблица, которая постоянно заполняется данными журнала.
Теперь я пытаюсь читать данные в пакетном режиме, например, каждые 2 минуты, и обрабатывать только то, чтоновый / добавленный с момента последней партии. Я пытался использовать структурированную потоковую передачу, но я борюсь (плохая документация или это только я?). Строки имеют поле метки времени, но по нескольким причинам я хотел бы воздействовать на поступающие данные, а не полагаться на метки времени. Следовательно, выбор, основанный на time / timedelta, не является вариантом.
Я попытался выполнить следующее:
myInput = spark.readStream.table(tablename).where("...").select(fields)
myStream = myInput.writeStream.foreachBatch(myForeachMethod).trigger(processingTime="2 minutes")
myStream.start()
display(myStream)
Задание получает данные каждые 2 минуты. Метод display () дает фрейм данных с данными, но он растёт / дополняет вместо того, чтобы очищать старые и давать только новые данные. Также мой пользовательский метод (myForeachMethod ()), кажется, получает полный фрейм данных вместо только новых данных.
Итак, мой вопрос - есть ли способ (пакетный) процесс обработки только нового с момента последнего триггера?