Обновления дельта-таблицы Spark - PullRequest
1 голос
/ 26 мая 2020

Я работаю в среде Microsoft Azure Databricks, используя spark sql и pyspark. Итак, у меня есть дельта-таблица на озере, где данные разделены, скажем, file_date. Каждый раздел содержит файлы, в которых хранятся миллионы записей в день без первичного / уникального ключа. Все эти записи имеют столбец «статус», который может содержать либо значения NULL (если все выглядит хорошо в этой конкретной c записи), либо Not null (скажем, если конкретное отображение поиска для определенного столбца не найдено). Кроме того, в моем процессе есть еще одна папка, называемая «сопоставление», которая обновляется периодически c, скажем, каждую ночь, чтобы упростить задачу, откуда и обнаруживаются сопоставления.

Ежедневно существует хороший шанс, что около 100 ~ 200 строк получат ошибку (столбец состояния, содержащий ненулевые значения). Из этих файлов ежедневно (отсюда и раздел по file_date) последующее задание извлекает все действительные записи и отправляет их для дальнейшей обработки, игнорируя эти 100-200 записей с ошибками, ожидая получения правильного файла сопоставления. Последующее задание, в дополнение к действительным записям статуса, должно также попытаться проверить, найдено ли сопоставление для записей с ошибками, и, если оно есть, также удалить его (после, конечно, обновления озера данных с помощью соответствующего сопоставления и статус).

Как лучше всего набрать go? Наилучший способ - сначала напрямую обновить дельта-таблицу / озеро с правильным отображением и обновить столбец статуса, чтобы сказать «available_for_reprocessing» и мое последующее задание, извлечь действительные данные за день + вытащить данные «available_for_reprocessing» и после обработки, обновить обратно со статусом "обработано". Но это кажется очень сложным с использованием дельты.

Я смотрел на «https://docs.databricks.com/delta/delta-update.html», а в приведенном там примере обновления просто приводится пример простого обновления с константами для обновления, не для обновлений из нескольких таблиц.

Другой, но наиболее неэффективный - это, скажем, вытащить ВСЕ данные (как обработанные, так и с ошибками) за последние, скажем, 30 дней, получить отображение для записей с ошибками и записать фрейм данных обратно в дельту озера, используя опцию replaceWhere. Это очень неэффективно, поскольку мы читаем все (сотни миллионов записей) и записываем все обратно, чтобы обработать, скажем, не более 1000 записей. Если вы ищете deltaTable = DeltaTable.forPath(spark, "/data/events/") по адресу «https://docs.databricks.com/delta/delta-update.html», приведенный пример предназначен для очень простых обновлений. Без уникального ключа невозможно обновить и указанные c записи. Может кто-нибудь помочь?

Я использую pyspark или могу использовать Spark sql, но я потерялся

Ответы [ 2 ]

1 голос
/ 26 мая 2020

Если вы хотите обновить 1 столбец ('статус') при условии, что все поиски теперь верны для строк, где они не были правильными раньше (где 'статус' в настоящее время неверен), я думаю, что команда UPDATE вместе с EXISTS может помочь вам решить эту проблему. Он не упоминается в документации по обновлению, но работает как для операций удаления, так и для операций обновления, эффективно позволяя обновлять / удалять записи при объединениях.

Для вашего сценария я считаю, что команда sql могла бы что-то найти вот так:

UPDATE your_db.table_name AS a 
SET staus = 'correct'
  WHERE EXISTS 
  (
    SELECT * 
    FROM your_db.table_name AS b 
    JOIN lookup_table_1 AS t1 ON t1.lookup_column_a = b.lookup_column_a
    JOIN lookup_table_2 AS t2 ON t2.lookup_column_b = b.lookup_column_b
    -- ... add further lookups if needed
    WHERE
    b.staus = 'incorrect' AND
    a.lookup_column_a = b.lookup_column_a AND 
    a.lookup_column_b = b.lookup_column_b
  )
0 голосов
/ 27 мая 2020

Слияние сделало свое дело ...

MERGE INTO deptdelta AS maindept ИСПОЛЬЗОВАНИЕ updated_dept_location AS upddept ON upddept.dno = maindept.dno КОГДА СООТВЕТСТВУЕТ ЗАТЕМ ОБНОВЛЕНИЕ УСТАНОВИТЕ maindept.dname = upddept.updated_name, maindept.location = upddept.updated_location

...