Я работаю в среде Microsoft Azure Databricks, используя spark sql и pyspark. Итак, у меня есть дельта-таблица на озере, где данные разделены, скажем, file_date. Каждый раздел содержит файлы, в которых хранятся миллионы записей в день без первичного / уникального ключа. Все эти записи имеют столбец «статус», который может содержать либо значения NULL (если все выглядит хорошо в этой конкретной c записи), либо Not null (скажем, если конкретное отображение поиска для определенного столбца не найдено). Кроме того, в моем процессе есть еще одна папка, называемая «сопоставление», которая обновляется периодически c, скажем, каждую ночь, чтобы упростить задачу, откуда и обнаруживаются сопоставления.
Ежедневно существует хороший шанс, что около 100 ~ 200 строк получат ошибку (столбец состояния, содержащий ненулевые значения). Из этих файлов ежедневно (отсюда и раздел по file_date) последующее задание извлекает все действительные записи и отправляет их для дальнейшей обработки, игнорируя эти 100-200 записей с ошибками, ожидая получения правильного файла сопоставления. Последующее задание, в дополнение к действительным записям статуса, должно также попытаться проверить, найдено ли сопоставление для записей с ошибками, и, если оно есть, также удалить его (после, конечно, обновления озера данных с помощью соответствующего сопоставления и статус).
Как лучше всего набрать go? Наилучший способ - сначала напрямую обновить дельта-таблицу / озеро с правильным отображением и обновить столбец статуса, чтобы сказать «available_for_reprocessing» и мое последующее задание, извлечь действительные данные за день + вытащить данные «available_for_reprocessing» и после обработки, обновить обратно со статусом "обработано". Но это кажется очень сложным с использованием дельты.
Я смотрел на «https://docs.databricks.com/delta/delta-update.html», а в приведенном там примере обновления просто приводится пример простого обновления с константами для обновления, не для обновлений из нескольких таблиц.
Другой, но наиболее неэффективный - это, скажем, вытащить ВСЕ данные (как обработанные, так и с ошибками) за последние, скажем, 30 дней, получить отображение для записей с ошибками и записать фрейм данных обратно в дельту озера, используя опцию replaceWhere. Это очень неэффективно, поскольку мы читаем все (сотни миллионов записей) и записываем все обратно, чтобы обработать, скажем, не более 1000 записей. Если вы ищете deltaTable = DeltaTable.forPath(spark, "/data/events/")
по адресу «https://docs.databricks.com/delta/delta-update.html», приведенный пример предназначен для очень простых обновлений. Без уникального ключа невозможно обновить и указанные c записи. Может кто-нибудь помочь?
Я использую pyspark или могу использовать Spark sql, но я потерялся