Как добавить строки в существующий раздел в Spark? - PullRequest
1 голос
/ 04 августа 2020

Мне нужно обновить исторические данные. Под обновлением я подразумеваю добавление новых строк, а иногда и новых столбцов в существующий раздел на S3.

Текущее разбиение осуществляется по дате: created_year={}/created_month={}/created_day={}. Чтобы избежать слишком большого количества объектов на раздел, я делаю следующее, чтобы поддерживать один объект / раздел:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions

Существует сценарий, в котором мне нужно добавить определенные строки с этими столбцовыми значениями:

created_year | created_month | created_day
2019         |10             |27   

Это означает, что файл (объект S3) по этому пути: created_year=2019/created_month=10/created_day=27/some_random_name.parquet будет дополнен новыми строками.

Если есть изменение в схеме, то все объекты будут иметь для реализации этого изменения.

Я попытался изучить, как это работает в целом, поэтому есть два режима, представляющих интерес: перезапись, добавление.

Первый просто добавит текущие данные и удалит остальные. Я не хочу такой ситуации. Второй будет добавлен, но может привести к созданию дополнительных объектов . Я тоже не хочу такой ситуации. Я также читал, что фреймы данных неизменяемы в Spark.

Итак, как мне добиться добавления новых данных по мере их поступления в существующие разделы и поддержания одного объекта в день?

1 Ответ

1 голос
/ 05 августа 2020

На основании вашего вопроса я понимаю, что вам нужно добавить новые строки к существующим данным, не увеличивая количество паркетных файлов. Этого можно добиться, выполнив операции с определенными c папками разделов . При этом может быть три случая.

1) Новый раздел

Это означает, что входящие данные имеют новое значение в столбцах раздела. В вашем случае это может быть так:

Существующие данные

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  1  |

Новые данные

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  2  |

Итак , в этом случае вы можете просто создать новую папку раздела для входящих данных и сохранить ее, как и вы.

partition_path = "/path/to/data/year=2020/month=1/day=2"
new_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

2) Существующий раздел, новые данные

Здесь вы хотите добавить новые строки к существующим данным . Это может быть так:

Существующие данные

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |

Новые данные

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  b  |   1   |

Здесь у нас есть новые запись для того же раздела. Вы можете использовать «режим добавления», но вам нужен единственный паркетный файл в каждой папке раздела. Вот почему вы должны сначала прочитать существующий раздел, объединить его с новыми данными, а затем записать его обратно.

partition_path = "/path/to/data/year=2020/month=1/day=1"
old_data = spark.read.parquet(partition_path)
write_data = old_data.unionByName(new_data)
write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

3) Существующий раздел, существующие данные

Что, если входящие данные UPDATE , а не INSERT ? В этом случае вам следует обновить строку вместо того, чтобы вставлять новую. Представьте себе это:

Существующие данные

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |

Новые данные

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   2   |

"a" имело значение 1 раньше, теперь мы хотим, чтобы это было 2. Итак, в этом случае вы должны прочитать существующие данные и обновить существующие записи. Это может быть достигнуто следующим образом.

partition_path = "/path/to/data/year=2020/month=1/day=1"
old_data = spark.read.parquet(partition_path)
write_data = old_data.join(new_data, ["year", "month", "day", "key"], "outer")
write_data = write_data.select(
    "year", "month", "day", "key",
    F.coalesce(new_data["value"], old_data["value"]).alias("value")
)
write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

Когда мы соединяем старые данные с новыми, может быть четыре вещи,

  • оба данных имеют одинаковое значение , не имеет значения, какой из них
  • два данных имеют разные значения, возьмите новое значение
  • старые данные не имеют значения, новые данные имеют, возьмите новое
  • новые данные не имеют значения, старые данные имеют, возьмите старые

Чтобы выполнить то, что мы здесь желаем, coalesce из pyspark.sql.functions выполнит свою работу.

Обратите внимание, что это решение охватывает и второй случай.

Об изменении схемы

Spark поддерживает объединение схем для формата файла паркета. Это означает, что вы можете добавлять столбцы в данные или удалять их. Добавляя или удаляя столбцы, вы поймете, что некоторые столбцы отсутствуют при чтении данных с верхнего уровня. Это связано с тем, что Spark по умолчанию отключает слияние схем. Из документации :

Подобно Protocol Buffer, Avro и Thrift, Parquet также поддерживает эволюцию схемы. Пользователи могут начать с простой схемы и постепенно добавлять в схему дополнительные столбцы по мере необходимости. Таким образом, пользователи могут получить несколько файлов Parquet с разными, но взаимно совместимыми схемами. Источник данных Parquet теперь может автоматически определять этот случай и объединять схемы всех этих файлов.

Чтобы иметь возможность читать все столбцы, вам необходимо установить для параметра mergeSchema значение true.

df = spark.read.option("mergeSchema", "true").parquet(path)
...