На основании вашего вопроса я понимаю, что вам нужно добавить новые строки к существующим данным, не увеличивая количество паркетных файлов. Этого можно добиться, выполнив операции с определенными c папками разделов . При этом может быть три случая.
1) Новый раздел
Это означает, что входящие данные имеют новое значение в столбцах раздела. В вашем случае это может быть так:
Существующие данные
| year | month | day |
| ---- | ----- | --- |
| 2020 | 1 | 1 |
Новые данные
| year | month | day |
| ---- | ----- | --- |
| 2020 | 1 | 2 |
Итак , в этом случае вы можете просто создать новую папку раздела для входящих данных и сохранить ее, как и вы.
partition_path = "/path/to/data/year=2020/month=1/day=2"
new_data.repartition(1, "year", "month", "day").write.parquet(partition_path)
2) Существующий раздел, новые данные
Здесь вы хотите добавить новые строки к существующим данным . Это может быть так:
Существующие данные
| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 | 1 | 1 | a | 1 |
Новые данные
| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 | 1 | 1 | b | 1 |
Здесь у нас есть новые запись для того же раздела. Вы можете использовать «режим добавления», но вам нужен единственный паркетный файл в каждой папке раздела. Вот почему вы должны сначала прочитать существующий раздел, объединить его с новыми данными, а затем записать его обратно.
partition_path = "/path/to/data/year=2020/month=1/day=1"
old_data = spark.read.parquet(partition_path)
write_data = old_data.unionByName(new_data)
write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)
3) Существующий раздел, существующие данные
Что, если входящие данные UPDATE , а не INSERT ? В этом случае вам следует обновить строку вместо того, чтобы вставлять новую. Представьте себе это:
Существующие данные
| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 | 1 | 1 | a | 1 |
Новые данные
| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 | 1 | 1 | a | 2 |
"a" имело значение 1 раньше, теперь мы хотим, чтобы это было 2. Итак, в этом случае вы должны прочитать существующие данные и обновить существующие записи. Это может быть достигнуто следующим образом.
partition_path = "/path/to/data/year=2020/month=1/day=1"
old_data = spark.read.parquet(partition_path)
write_data = old_data.join(new_data, ["year", "month", "day", "key"], "outer")
write_data = write_data.select(
"year", "month", "day", "key",
F.coalesce(new_data["value"], old_data["value"]).alias("value")
)
write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)
Когда мы соединяем старые данные с новыми, может быть четыре вещи,
- оба данных имеют одинаковое значение , не имеет значения, какой из них
- два данных имеют разные значения, возьмите новое значение
- старые данные не имеют значения, новые данные имеют, возьмите новое
- новые данные не имеют значения, старые данные имеют, возьмите старые
Чтобы выполнить то, что мы здесь желаем, coalesce
из pyspark.sql.functions
выполнит свою работу.
Обратите внимание, что это решение охватывает и второй случай.
Об изменении схемы
Spark поддерживает объединение схем для формата файла паркета. Это означает, что вы можете добавлять столбцы в данные или удалять их. Добавляя или удаляя столбцы, вы поймете, что некоторые столбцы отсутствуют при чтении данных с верхнего уровня. Это связано с тем, что Spark по умолчанию отключает слияние схем. Из документации :
Подобно Protocol Buffer, Avro и Thrift, Parquet также поддерживает эволюцию схемы. Пользователи могут начать с простой схемы и постепенно добавлять в схему дополнительные столбцы по мере необходимости. Таким образом, пользователи могут получить несколько файлов Parquet с разными, но взаимно совместимыми схемами. Источник данных Parquet теперь может автоматически определять этот случай и объединять схемы всех этих файлов.
Чтобы иметь возможность читать все столбцы, вам необходимо установить для параметра mergeSchema
значение true
.
df = spark.read.option("mergeSchema", "true").parquet(path)