У меня есть таблица дельты озера бронзового уровня (events_bronze) в расположении "/ mnt / events-bronze", в которую передаются данные из kafka. Теперь я хочу иметь возможность потоковой передачи из этой таблицы и обновления с использованием «foreachBatch» в серебряную таблицу (events_silver ". Это может быть достигнуто с использованием бронзовой таблицы в качестве источника. Однако во время первоначального запуска, так как events_silver не существует, явсе время получаю сообщение об ошибке, утверждая, что дельта-таблицы не существует, что очевидно. Итак, как мне перейти к созданию events_silver, который имеет ту же структуру, что и events_bronze? Мне не удалось найти DDL для того же.
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
.merge(
microBatchOutputDF.as("bronze"),
"silver.id=bronze.id")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
events_bronze
.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Во время первоначального запуска проблема заключается в том, что для пути "/ mnt / events-silver" не определена таблица дельты озера. Я не уверен, как создать ее, имеющую такую же структуру, как "/ mnt / events-bronze" дляпервый запуск.