Потоковая передача данных в фрейм данных, но только обработка того, что недавно поступило - PullRequest
0 голосов
/ 23 октября 2019

Используя Spark и Databricks, у меня есть (дельта) таблица, которая постоянно заполняется данными журнала.

Теперь я пытаюсь читать данные в пакетном режиме, например, каждые 2 минуты, и обрабатывать только то, чтоновый / добавленный с момента последней партии. Я пытался использовать структурированную потоковую передачу, но я борюсь (плохая документация или это только я?). Строки имеют поле метки времени, но по нескольким причинам я хотел бы воздействовать на поступающие данные, а не полагаться на метки времени. Следовательно, выбор, основанный на time / timedelta, не является вариантом.

Я попытался выполнить следующее:

myInput = spark.readStream.table(tablename).where("...").select(fields)
myStream = myInput.writeStream.foreachBatch(myForeachMethod).trigger(processingTime="2 minutes")
myStream.start()

display(myStream)

Задание получает данные каждые 2 минуты. Метод display () дает фрейм данных с данными, но он растёт / дополняет вместо того, чтобы очищать старые и давать только новые данные. Также мой пользовательский метод (myForeachMethod ()), кажется, получает полный фрейм данных вместо только новых данных.

Итак, мой вопрос - есть ли способ (пакетный) процесс обработки только нового с момента последнего триггера?

...