У меня есть сценарий, в котором я хочу запустить задание структурированной потоковой передачи Spark, чтобы прочитать исходный файл дельтаблоки данных и извлечь только вставки в исходный файл. Я хочу отфильтровать все обновления / удаления.
Я пытался следовать меньшему файлу, но код, похоже, не выполняет то, что я ожидал.
spark
.readStream
.format("delta")
.option("latestFirst","true")
.option("ignoreDeletes", "true")
.option("ignoreChanges","true")
.load("/mnt/data-lake/data/bronze/accounts")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation","/mnt/data-lake/tmp/chkpnt_accounts_inserts")
.option("path","/mnt/data-lake/tmp/accounts_inserts")
.start()