pyspark записывает в таблицу кустов уменьшенное / сжатое количество небольших файлов - PullRequest
0 голосов
/ 19 июня 2020

У меня есть запись фрейма данных, обновляемая каждый раз при запуске процесса, это означает, что у меня будет фрейм данных из одной строки и 4 столбцов каждый раз, когда процесс завершится. Затем я вставлю его в таблицу улья, используя запись фрейма данных и в формате паркета. Из-за одной записи за раз я вижу так много маленьких файлов в папке таблицы в hfds.

Не могли бы вы сообщить мне, как уменьшить и записать их в тот же файл (файл паркета), когда я Записываю данные в таблицу куста ??

hdfs location: user_id/employe_db/market_table/
from:
part-04498-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04497-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04450-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04449-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

to:
part-03049-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

Как уменьшить количество файлов паркета до фиксированного количества файлов меньше и загружать / записывать новые данные в существующие файлы ?? part-04499-f33fc4b5-47d9-4d14-b37e-8f670cb2c53 c -c000.snappy.parquet

1 Ответ

0 голосов
/ 19 июня 2020

Перед записью в HDFS вы можете repartition(1), чтобы вы создавали 1 файл за одно выполнение.

df.repartition(1).write.parquet("<directory>")

Merging files:

Using Hive:

Если у вас уже есть таблица кустов поверх каталога user_id/employe_db/market_table/, запустите вставку перезаписи, выбрав ту же таблицу.

spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market>")

- чтобы создать только один файл, используйте порядок

spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market> order by <column>")

Вы также можете запускать операторы вставки, как и в Hive.

(или)

Using Spark:

В качестве процесса загрузки сообщения вы можете снова читать паркетные файлы из каталога, затем снова выполнить разделение и записать в каталог.

df_src=spark.read.parquet("<directory>")
df_src.repartition(<number>).write.mode("overwrite").parquet("<directory>")

NOTE

  • перезапись сначала удаляет каталог; в случае сбоя задания в промежутке мы можем столкнуться с потерей данных.
  • Лучше всего сделать резервную копию данных в каталоге tmp, а затем перезаписать только
...