Я видел, что вы используете блоки данных в стеке azure. Я думаю, что наиболее жизнеспособным и рекомендуемым для вас способом было бы использование нового проекта дельта-озера в блоках данных:
Он предоставляет варианты для различных upserts, merges и кислотные транзакции для хранилищ объектов, таких как хранилище данных s3 или azure. Он в основном обеспечивает управление, безопасность, изоляцию и поддержку / слияние, предоставляемое хранилищами данных для зон данных. Для одного конвейера Apple фактически заменила свои хранилища данных, которые будут работать исключительно на дельта-блоках данных из-за своей функциональности и гибкости. Для вашего случая использования и многих других, которые используют паркет, это просто простая замена замены «паркета» на «дельта» , чтобы использовать его функциональность (если у вас есть блоки данных). Delta - это, по сути, естественная эволюция из паркета , и блоки данных отлично справились с задачей, предоставив дополнительные функциональные возможности и предоставив их с открытым исходным кодом.
Для вашего случая я предложил бы вам попробовать опцию replaceWhere , предоставленную в delta. Перед выполнением этого целевого обновления таблица назначения должна иметь формат delta
Вместо этого:
dataset.repartition(1).write.mode('overwrite')\
.partitionBy('Year','Week').parquet('\curataed\dataset')
С https://docs.databricks.com/delta/delta-batch.html:
'Вы можете выборочно перезаписать только те данные, которые соответствуют предикатам над столбцами секций '
Вы можете попробовать это:
dataset.write.repartition(1)\
.format("delta")\
.mode("overwrite")\
.partitionBy('Year','Week')\
.option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'")\ #to avoid overwriting Week3
.save("\curataed\dataset")
Кроме того, если вы хотите sh, чтобы привести разделы к 1, почему бы вам не использовать coalesce (1) поскольку это позволит избежать полного перемешивания.
С https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/:
' replaceWhere особенно полезно, когда вам нужно запустить вычислительно дорогой алгоритм , но только на определенных разделах '
Поэтому я лично считаю, что использование замены в другом месте для ручного указания перезаписи будет более целенаправленным и вычислительно эффективнее, чем просто полагаться: spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
Блоки данных обеспечивают оптимизацию дельта-таблиц, ускоряют и Эффективный вариант рукетирования для паркета (следовательно, естественная эволюция) путем упаковки бункера и z-заказа:
Из ссылки: https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html
'Оптимизировать подмножество строк, соответствующих предикату секционирования. Поддерживаются только фильтры, использующие атрибуты ключа раздела. '
' Объединить информацию столбца в одном наборе файлов. Совместное использование используется алгоритмами пропуска данных Delta Lake для значительного сокращения объема данных, которые необходимо прочитать ».
Ускоренное выполнение запроса с поддержкой индексации, статистики и автоматического кэширования
Надежность данных с расширенной проверкой схемы и гарантиями транзакций
Упрощенный конвейер данных с гибкой поддержкой UPSERT и унифицированной структурированной потоковой передачей + пакетная обработка на одном источнике данных
Вы также можете ознакомиться с полной документацией проекта с открытым исходным кодом: https://docs.delta.io/latest/index.html
.. Я также хочу сказать, что я не работаю для блоков данных / дельта-лейк. Я только что видел, как их улучшения и функциональность приносят мне пользу в моей работе.
ОБНОВЛЕНИЕ:
Суть вопроса " замена существующих данных и создание новых папок для новых данных " и выполнение этого с высокой степенью масштабируемости и эффективности.
Использование перезаписи разделов Dynami c в паркете делает свою работу, однако я чувствую себя естественным Эволюция этого метода заключается в использовании операций слияния дельта-таблиц, которые были в основном созданы для 'интеграции данных из Spark DataFrames в Delta Lake' . Они предоставляют вам дополнительную функциональность и оптимизацию при объединении ваших данных, основываясь на том, как бы этого хотелось, и ведут журнал всех действий в таблице, чтобы вы могли откатить версии при необходимости.
Delta lake python API (для слияния): https://docs.delta.io/latest/api/python/index.html#delta .tables.DeltaMergeBuilder
Оптимизация блоков данных: https://kb.databricks.com/delta/delta-merge-into.html#discussion
Использование В одной операции слияния вы можете указать условие слияния, в данном случае это может быть комбинация года и недели и идентификатора, а затем, если записи совпадают (это означает, что они существуют в вашем искровом фрейме данных и дельта-таблице, week1 и week2), обновите их с данными в вашем искровом фрейме данных и оставьте другие записи без изменений:
#you can also add additional condition if the records match, but not required
.whenMatchedUpdateAll(condition=None)
В некоторых случаях, если ничего не совпадает, вы можете вставить и создать новые строки и разделы, для которых вы можете использовать:
.whenNotMatchedInsertAll(condition=None)
Вы можете использовать. converttodelta операция https://docs.delta.io/latest/api/python/index.html#delta .tables.DeltaTable.convertToDelta , чтобы преобразовать ваш паркетный стол в дельта-таблица, чтобы вы могли выполнять дельта-операции над ней с помощью API.
'Теперь вы можете преобразовать таблицу Parquet на месте в таблицу Delta Lake без перезаписи каких-либо данных. Это отлично подходит для преобразования очень больших паркетных столов, которые стоило бы переписать в виде таблицы Delta. Кроме того, этот процесс является обратимым:
Ваш случай слияния ( замена данных там, где они существуют, и создание новых записей, когда их не существует ) могут go вот так:
(не тестировал, см. примеры + api для синтаксиса)
%python
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`\curataed\dataset`")
deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year AND target.Week = dataset.Week") \
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
Если дельта-таблица разделена правильно (год, неделя) и вы правильно использовали предложение whenmatched Эти операции будут оптимизированы и могут занять несколько секунд в вашем случае. Он также обеспечивает согласованность, атомарность и целостность данных с возможностью отката.
Некоторые дополнительные функции включают в себя возможность указания набора столбцов для обновления в случае совпадения (если вам нужно только обновить определенные столбцы). Вы также можете включить spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
, чтобы delta использовала минимальные целевые разделы для выполнения слияния (обновления, удаления, создания).
В целом, я думаю, что использование этого подхода является очень новым и инновационным способом переноса. целевые обновления, поскольку это дает вам больший контроль над ним, сохраняя при этом высокую эффективность операций. Использование партера с режимом динамического c разбиения и перезаписи также будет работать нормально, однако функции Delta Lake привносят качество данных в ваше озеро данных, которое не имеет аналогов.
Моя рекомендация: Я бы сказал, что сейчас используйте динамический c режим перезаписи разделов для файлов паркета, чтобы выполнять свои обновления, и вы можете поэкспериментировать и попробовать использовать дельта-слияние только для одной таблицы с оптимизацией блоков данных spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
и .whenMatchedUpdateAll()
и сравните производительность обоих (ваши файлы маленькие, поэтому я не думаю, что это будет большой разницей). Статья по оптимизации удаления разделов из базы данных для слияний вышла в феврале, так что она действительно новая и, возможно, может стать игровым заменителем для накладных операций дельта-слияния (поскольку под капотом они просто создают новые файлы, но сокращение раздела может ускорить это)
Примеры слияния в python, scala, sql: https://docs.databricks.com/delta/delta-update.html#merge - примеры
https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html