Стратегия partitionBy & overwrite в Azure DataLake с использованием PySpark в Databricks - PullRequest
4 голосов
/ 03 марта 2020

У меня есть простой процесс ETL в окружении Azure

хранилище BLOB-объектов> datafactory> rawatake raw> databricks> курируемый datalake> хранилище данных (основной ETL).

наборы данных для этого проекта не очень велики (~ 1 миллион строк 20 столбцов дают или берут), однако я хотел бы правильно разделить их в моем массиве данных как файлы Parquet.

в настоящее время я запускаю несколько простых логи c чтобы определить, где в моем озере должен находиться каждый файл на основе деловых календарей.

файлы смутно выглядят следующим образом

Year Week Data
2019 01   XXX
2019 02   XXX

Затем я делю данный файл на следующий формат, заменяя существующие данные и создавая новые папки для новых данных.

curated ---
           dataset --
                     Year 2019 
                              - Week 01 - file.pq + metadata
                              - Week 02 - file.pq + metadata
                              - Week 03 - file.pq + datadata #(pre existing file)

метаданные success и commit , которые автоматически генерируются.

для этой цели я использую следующий запрос в Pyspark 2.4.3

dataset.repartition(1).write.mode('overwrite')\
                         .partitionBy('Year','Week').parquet('\curataed\dataset')

теперь, если я использую эту команду самостоятельно, она перезапишет любые существующие данные в целевом разделе

, поэтому Week 03 будет потеряно.

использование spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic"), кажется, решает проблему и только перезаписывает целевые файлы, но мне интересно, если это лучший способ обработки файлов в моем озере данных?

также я нашел трудно найти какую-либо документацию по вышеупомянутой функции.

мой первый инстинкт был l oop над одним паркетом и запись каждого раздела вручную, что, хотя и дает мне больший контроль над именованием файлов и целевым разделением, это для всех намерений и целей, очень и очень медленный (и посылает сигналы тревоги, исходящие из pandas мышления)

моя следующая мысль будет записать каждый раздел в папку /tmp и переместить каждый файл паркета и затем замените файлы / создайте файлы, если это необходимо, используя запрос сверху. затем очистите папку /tmp во время создания какого-либо журнала метаданных.

Есть ли лучший способ / метод для этого?

Любое руководство будет высоко ценится.

конечная цель здесь - создать чистую и безопасную область для всех «кураторских» данных, при этом имея журнал файлов паркета, который я могу прочитать в DataWarehouse для дальнейшей ETL.

Ответы [ 3 ]

2 голосов
/ 06 марта 2020

Я видел, что вы используете блоки данных в стеке azure. Я думаю, что наиболее жизнеспособным и рекомендуемым для вас способом было бы использование нового проекта дельта-озера в блоках данных:

Он предоставляет варианты для различных upserts, merges и кислотные транзакции для хранилищ объектов, таких как хранилище данных s3 или azure. Он в основном обеспечивает управление, безопасность, изоляцию и поддержку / слияние, предоставляемое хранилищами данных для зон данных. Для одного конвейера Apple фактически заменила свои хранилища данных, которые будут работать исключительно на дельта-блоках данных из-за своей функциональности и гибкости. Для вашего случая использования и многих других, которые используют паркет, это просто простая замена замены «паркета» на «дельта» , чтобы использовать его функциональность (если у вас есть блоки данных). Delta - это, по сути, естественная эволюция из паркета , и блоки данных отлично справились с задачей, предоставив дополнительные функциональные возможности и предоставив их с открытым исходным кодом.

Для вашего случая я предложил бы вам попробовать опцию replaceWhere , предоставленную в delta. Перед выполнением этого целевого обновления таблица назначения должна иметь формат delta

Вместо этого:

dataset.repartition(1).write.mode('overwrite')\
                         .partitionBy('Year','Week').parquet('\curataed\dataset')

С https://docs.databricks.com/delta/delta-batch.html:

'Вы можете выборочно перезаписать только те данные, которые соответствуют предикатам над столбцами секций '

Вы можете попробовать это:

dataset.write.repartition(1)\
       .format("delta")\
       .mode("overwrite")\
       .partitionBy('Year','Week')\
       .option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'")\ #to avoid overwriting Week3
       .save("\curataed\dataset")

Кроме того, если вы хотите sh, чтобы привести разделы к 1, почему бы вам не использовать coalesce (1) поскольку это позволит избежать полного перемешивания.

С https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/:

' replaceWhere особенно полезно, когда вам нужно запустить вычислительно дорогой алгоритм , но только на определенных разделах '

Поэтому я лично считаю, что использование замены в другом месте для ручного указания перезаписи будет более целенаправленным и вычислительно эффективнее, чем просто полагаться: spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Блоки данных обеспечивают оптимизацию дельта-таблиц, ускоряют и Эффективный вариант рукетирования для паркета (следовательно, естественная эволюция) путем упаковки бункера и z-заказа:

Из ссылки: https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

  • ГДЕ (binpacking)

'Оптимизировать подмножество строк, соответствующих предикату секционирования. Поддерживаются только фильтры, использующие атрибуты ключа раздела. '

  • ZORDER BY

' Объединить информацию столбца в одном наборе файлов. Совместное использование используется алгоритмами пропуска данных Delta Lake для значительного сокращения объема данных, которые необходимо прочитать ».

  • Ускоренное выполнение запроса с поддержкой индексации, статистики и автоматического кэширования

  • Надежность данных с расширенной проверкой схемы и гарантиями транзакций

  • Упрощенный конвейер данных с гибкой поддержкой UPSERT и унифицированной структурированной потоковой передачей + пакетная обработка на одном источнике данных

Вы также можете ознакомиться с полной документацией проекта с открытым исходным кодом: https://docs.delta.io/latest/index.html

.. Я также хочу сказать, что я не работаю для блоков данных / дельта-лейк. Я только что видел, как их улучшения и функциональность приносят мне пользу в моей работе.

ОБНОВЛЕНИЕ:

Суть вопроса " замена существующих данных и создание новых папок для новых данных " и выполнение этого с высокой степенью масштабируемости и эффективности.

Использование перезаписи разделов Dynami c в паркете делает свою работу, однако я чувствую себя естественным Эволюция этого метода заключается в использовании операций слияния дельта-таблиц, которые были в основном созданы для 'интеграции данных из Spark DataFrames в Delta Lake' . Они предоставляют вам дополнительную функциональность и оптимизацию при объединении ваших данных, основываясь на том, как бы этого хотелось, и ведут журнал всех действий в таблице, чтобы вы могли откатить версии при необходимости.

Delta lake python API (для слияния): https://docs.delta.io/latest/api/python/index.html#delta .tables.DeltaMergeBuilder

Оптимизация блоков данных: https://kb.databricks.com/delta/delta-merge-into.html#discussion

Использование В одной операции слияния вы можете указать условие слияния, в данном случае это может быть комбинация года и недели и идентификатора, а затем, если записи совпадают (это означает, что они существуют в вашем искровом фрейме данных и дельта-таблице, week1 и week2), обновите их с данными в вашем искровом фрейме данных и оставьте другие записи без изменений:

#you can also add additional condition if the records match, but not required
.whenMatchedUpdateAll(condition=None)

В некоторых случаях, если ничего не совпадает, вы можете вставить и создать новые строки и разделы, для которых вы можете использовать:

.whenNotMatchedInsertAll(condition=None)

Вы можете использовать. converttodelta операция https://docs.delta.io/latest/api/python/index.html#delta .tables.DeltaTable.convertToDelta , чтобы преобразовать ваш паркетный стол в дельта-таблица, чтобы вы могли выполнять дельта-операции над ней с помощью API.

'Теперь вы можете преобразовать таблицу Parquet на месте в таблицу Delta Lake без перезаписи каких-либо данных. Это отлично подходит для преобразования очень больших паркетных столов, которые стоило бы переписать в виде таблицы Delta. Кроме того, этот процесс является обратимым:

Ваш случай слияния ( замена данных там, где они существуют, и создание новых записей, когда их не существует ) могут go вот так:

(не тестировал, см. примеры + api для синтаксиса)

%python  
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`\curataed\dataset`")

deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year  AND target.Week = dataset.Week") \
  .whenMatchedUpdateAll()\
  .whenNotMatchedInsertAll()\
  .execute()

Если дельта-таблица разделена правильно (год, неделя) и вы правильно использовали предложение whenmatched Эти операции будут оптимизированы и могут занять несколько секунд в вашем случае. Он также обеспечивает согласованность, атомарность и целостность данных с возможностью отката.

Некоторые дополнительные функции включают в себя возможность указания набора столбцов для обновления в случае совпадения (если вам нужно только обновить определенные столбцы). Вы также можете включить spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true"), чтобы delta использовала минимальные целевые разделы для выполнения слияния (обновления, удаления, создания).

В целом, я думаю, что использование этого подхода является очень новым и инновационным способом переноса. целевые обновления, поскольку это дает вам больший контроль над ним, сохраняя при этом высокую эффективность операций. Использование партера с режимом динамического c разбиения и перезаписи также будет работать нормально, однако функции Delta Lake привносят качество данных в ваше озеро данных, которое не имеет аналогов.

Моя рекомендация: Я бы сказал, что сейчас используйте динамический c режим перезаписи разделов для файлов паркета, чтобы выполнять свои обновления, и вы можете поэкспериментировать и попробовать использовать дельта-слияние только для одной таблицы с оптимизацией блоков данных spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true") и .whenMatchedUpdateAll() и сравните производительность обоих (ваши файлы маленькие, поэтому я не думаю, что это будет большой разницей). Статья по оптимизации удаления разделов из базы данных для слияний вышла в феврале, так что она действительно новая и, возможно, может стать игровым заменителем для накладных операций дельта-слияния (поскольку под капотом они просто создают новые файлы, но сокращение раздела может ускорить это)

Примеры слияния в python, scala, sql: https://docs.databricks.com/delta/delta-update.html#merge - примеры

https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html

1 голос
/ 07 марта 2020

Поправьте меня, если я упустил что-то важное в вашем подходе, но кажется, что вы хотите записать новые данные поверх существующих данных, что обычно делается с

write.mode('append')

вместо 'overwrite'

Если вы хотите сохранить данные, разделенные по пакетам, чтобы вы могли выбрать их для загрузки в хранилище данных или аудита, нет разумного способа сделать это, кроме включения этой информации в набор данных и разделения ее во время сохранения. Например,

dataset.write.mode('append')\
                     .partitionBy('Year','Week', 'BatchTimeStamp').parquet('curated\dataset')

Любое другое ручное вмешательство в формат файла паркета будет в лучшем случае хакерским, а в худшем случае может сделать ваш трубопровод ненадежным или испортить ваши данные. также хорошее предложение для надежного хранения данных в озерах данных и золотой стандарт прямо сейчас. Для вашего конкретного c варианта использования вы могли бы использовать его функцию создания исторических запросов (добавить все, а затем запросить разницу между текущим набором данных и после предыдущего пакета), однако журнал аудита ограничен во времени тем, как вы настраиваете свое дельта-озеро и может составлять всего 7 дней, поэтому, если вы хотите получить полную информацию в долгосрочной перспективе, вам все равно следует придерживаться подхода сохранения информации о партии.

На более стратегическом уровне, когда следуете raw -> curated -> DW, вы также можете рассмотреть возможность добавления еще одного «прыжка» и помещения готовых данных в «предварительно обработанную» папку, упорядоченную по партиям, а затем добавить их как в наборы кураторов и DW.

В качестве примечания, .repartition(1) не имеет особого смысла при использовании паркетов, так как в любом случае паркет является многофайловым форматом, поэтому единственное влияние, которое это оказывает, - это негативное влияние на производительность. Но, пожалуйста, дайте мне знать, если есть конкретная c причина, по которой вы его используете.

1 голос
/ 05 марта 2020

Вместо того, чтобы писать таблицу напрямую, мы можем использовать saveAsTable с добавлением и удалением разделов до этого.

dataset.repartition(1).write.mode('append')\
                     .partitionBy('Year','Week').saveAsTable("tablename")

Для удаления предыдущих разделов

partitions = [ (x["Year"], x["Week"]) for x in dataset.select("Year", "Week").distinct().collect()]
for year, week in partitions:
    spark.sql('ALTER TABLE tablename DROP IF EXISTS PARTITION (Year = "'+year+'",Week = "'+week+'")')
...