Delta Lake Create Table со структурой, похожей на другую - PullRequest
0 голосов
/ 04 октября 2019

У меня есть таблица дельты озера бронзового уровня (events_bronze) в расположении "/ mnt / events-bronze", в которую передаются данные из kafka. Теперь я хочу иметь возможность потоковой передачи из этой таблицы и обновления с использованием «foreachBatch» в серебряную таблицу (events_silver ". Это может быть достигнуто с использованием бронзовой таблицы в качестве источника. Однако во время первоначального запуска, так как events_silver не существует, явсе время получаю сообщение об ошибке, утверждая, что дельта-таблицы не существует, что очевидно. Итак, как мне перейти к созданию events_silver, который имеет ту же структуру, что и events_bronze? Мне не удалось найти DDL для того же.

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
    .merge(
      microBatchOutputDF.as("bronze"),
      "silver.id=bronze.id")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}
 events_bronze
      .writeStream
      .trigger(Trigger.ProcessingTime("120 seconds"))
      .format("delta")
      .foreachBatch(upsertToDelta _)
      .outputMode("update")
      .start()

Во время первоначального запуска проблема заключается в том, что для пути "/ mnt / events-silver" не определена таблица дельты озера. Я не уверен, как создать ее, имеющую такую ​​же структуру, как "/ mnt / events-bronze" дляпервый запуск.

1 Ответ

0 голосов
/ 05 октября 2019

Вы можете проверить таблицу, используя иск SQL. Сначала запустите приведенный ниже искровой SQL, который даст определение таблицы бронзовой таблицы:

spark.sql("show create table event_bronze").show

После получения DDL просто измените местоположение на путь к серебряному столу и запустите этот оператор - искровой SQL.

Примечание. Используйте «создать таблицу, если она не существует ...», так как она не будет работать при одновременных запусках.

...