UPSERT в паркет Писпарк - PullRequest
       27

UPSERT в паркет Писпарк

3 голосов
/ 26 января 2020

У меня есть паркетные файлы в s3 со следующими разделами: год / месяц / дата / some_id Используя Spark (PySpark), я каждый день хотел бы использовать UPSERT последние 14 дней - я хотел бы заменить существующие данные в s3 (один файл паркета для каждого раздела), но не удалять дни, предшествующие 14 дням. Я пробовал два режима сохранения: append - не очень хорошо, потому что это просто добавляет другой файл. перезаписать - удаляет прошлые данные и данные для других разделов.

Есть ли способ или лучший способ преодолеть это? я должен прочитать все данные от s3 в каждом запуске, и записать это снова? может быть, переименование файлов, чтобы append заменил текущий файл в s3?

Большое спасибо!

Ответы [ 3 ]

4 голосов
/ 27 января 2020

Я обычно делаю что-то подобное. В моем случае я делаю ETL и добавляю данные за один день в файл parquet :

Ключом является работа с данными, которые вы хотите записать (в моем случае фактическая дата) , убедитесь, что разделен на столбец date и перезапишите все данные для текущей даты .

Это сохранит все старые данные. Как пример:

(
    sdf
    .write
    .format("parquet")
    .mode("overwrite")
    .partitionBy("date")
    .option("replaceWhere", "2020-01-27")
    .save(uri)
)

Также вы можете взглянуть на delta.io , который является расширением формата parquet , который предоставляет некоторые интересные функции, такие как КИСЛОТА транзакции.

1 голос
/ 05 февраля 2020

Спасибо всем за полезные решения. Я закончил тем, что использовал некоторую конфигурацию, которая служила моему сценарию использования - используя режим overwrite , когда я пишу паркет, вместе с этой конфигурацией:

Я добавил эту конфигурацию:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

с этой конфигурацией искра будет перезаписывать только те разделы, для которых у нее есть данные для записи. Все остальные (прошлые) разделы остаются без изменений - см. Здесь:

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-dynamic-partition-inserts.html

1 голос
/ 27 января 2020

Насколько мне известно, у S3 нет операции обновления. Как только объект добавлен в s3, его нельзя изменить. (либо нужно заменить другой объект, либо добавить файл)

В любом случае, если вы хотите прочитать все данные, вы можете указать временную шкалу, которую вы хотите прочитать, отсечение разделов помогает в чтении только разделов. в сроки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...