Является ли это лучшим методом для загрузки и объединения данных в существующую таблицу дельта на кирпичи данных? - PullRequest
1 голос
/ 02 мая 2019

Я новичок в использовании Databricks и пытаюсь проверить правильность непрерывной загрузки почасового файла в основной файл, который будет использоваться для отчетов. Каждый почасовой файл занимает примерно 3-400 ГБ и содержит ~ 1-1.3b записей. Я хотел бы, чтобы основная таблица хранила данные за 48 часов, но мне действительно нужно только 6 часовых файлов, чтобы завершить просмотр моих данных.

Мой текущий процесс ниже, и, кажется, работает нормально. Почасовой файл csv хранится в Azure DataLake (Gen1), а основная таблица использует ADL Gen2 в качестве хранилища. Это лучшие варианты? Этот процесс кажется правильным или я делаю что-то ужасно неправильно? :)

csvdata = spark.read.format("csv").option("header","true").option("ignoreLeadingWhiteSpace","true").option("ignoreTrailingWhiteSpace","true").option("timestampFormat","yyyy-MM-dd HH:mm:ss.SSS").option("delimiter","|").option("inferSchema","true").option("mode","FAILFAST").csv("adl://pathToCsv").createOrReplaceTempView("tempdata").cache()

```sql Merge
MERGE INTO primaryTable
USING tempdata
ON primaryTable.UserGuid = tempdata.UserGuid AND primaryTable.OrgGuid = tempdata.OrgGuid
WHEN MATCHED AND cast(unix_timestamp(primaryTable.timestamp,'yyyy/MM/dd HH:mm:ss.SSS') AS timestamp) < cast(unix_timestamp(tempdata.timestamp,'yyyy/MM/dd HH:mm:ss.SSS') AS timestamp) THEN
  UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *
...