Размер файла паркета, пожарный шланг против искры - PullRequest
5 голосов
/ 29 июня 2019

Я генерирую файлы Parquet двумя способами: Kinesis Firehose и Spark.Они оба записаны в одну структуру разделов на S3.Оба набора данных могут быть запрошены с использованием одного и того же определения таблицы Athena.Оба используют сжатие gzip.

Я заметил, однако, что файлы Parquet, сгенерированные Spark, примерно в 3 раза больше, чем файлы из Firehose.По какой причине это должно быть так?Я замечаю некоторые различия в схемах и метаданных, когда загружаю их с помощью Pyarrow:

>>> import pyarrow.parquet as pq
>>> spark = pq.ParquetFile('<spark object name>.gz.parquet')
>>> spark.metadata
<pyarrow._parquet.FileMetaData object at 0x101f2bf98>
  created_by: parquet-mr version 1.8.3 (build aef7230e114214b7cc962a8f3fc5aeed6ce80828)
  num_columns: 4
  num_rows: 11
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1558
>>> spark.schema
<pyarrow._parquet.ParquetSchema object at 0x101f2f438>
uri: BYTE_ARRAY UTF8
dfpts.list.element: BYTE_ARRAY UTF8
udids.list.element: BYTE_ARRAY UTF8
uuids.list.element: BYTE_ARRAY UTF8

>>> firehose = pq.ParquetFile('<firehose object name>.parquet')
>>> firehose.metadata
<pyarrow._parquet.FileMetaData object at 0x10fc63458>
  created_by: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
  num_columns: 4
  num_rows: 156
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1017
>>> firehose.schema
<pyarrow._parquet.ParquetSchema object at 0x10fc5e7b8>
udids.bag.array_element: BYTE_ARRAY UTF8
dfpts.bag.array_element: BYTE_ARRAY UTF8
uuids.bag.array_element: BYTE_ARRAY UTF8
uri: BYTE_ARRAY UTF8

Возможно ли, что разница в схемах является виновником?Что-то еще?

Эти два конкретных файла не содержат точных одних и тех же данных, но по моим запросам Athena общая мощность всех списков для всех строк в файле Firehose составляет примерно 2,5.x что находится в файле Spark.

РЕДАКТИРОВАНИЕ В ДОБАВЛЕНИЕ:

Я написал следующее, чтобы по существу выгрузить содержимое каждого файла паркета в стандартный вывод по одной строке в строке:

import sys
import pyarrow.parquet as pq

table = pq.read_table(sys.argv[1])
pydict = table.to_pydict()
for i in range(0, table.num_rows):
    print(f"{pydict['uri'][i]}, {pydict['dfpts'][i]}, {pydict['udids'][i]}, {pydict['uuids'][i]}")

Затем я запустил это для каждого файла паркета и передал вывод в файл.Ниже приведены размеры исходных двух файлов, выходные данные, указывающие приведенный выше код Python на каждый файл, и версия этого файла в формате gzip:

-rw-r--r--  1 myuser  staff  1306337 Jun 28 16:19 firehose.parquet
-rw-r--r--  1 myuser  staff  8328156 Jul  2 15:09 firehose.printed
-rw-r--r--  1 myuser  staff  5009543 Jul  2 15:09 firehose.printed.gz
-rw-r--r--  1 myuser  staff  1233761 Jun 28 16:23 spark.parquet
-rw-r--r--  1 myuser  staff  3213528 Jul  2 15:09 spark.printed
-rw-r--r--  1 myuser  staff  1951058 Jul  2 15:09 spark.printed.gz

Обратите внимание, что два файла паркета имеют примерно одинаковый размер, но «напечатанное» содержимое файла пожарного шланга примерно в 2,5 раза больше размера «напечатанного» содержимого из файла искры.И они примерно одинаково сжимаемы.

Итак: что занимает все пространство в файле паркета Spark, если это не необработанные данные?

ИЗМЕНЕНО В ДОБАВИТЬ:

Ниже приведен вывод "parquet-tools meta".Коэффициенты сжатия для каждого столбца выглядят одинаково, но файл пожарного шланга содержит гораздо больше значений на несжатый байт.Для столбца "dfpts":

пожарный шланг:

SZ:667849/904992/1.36 VC:161475

spark:

SZ:735561/1135861/1.54 VC:62643

мета-вывод parquet-tools:

file:            file:/Users/jh01792/Downloads/firehose.parquet 
creator:         parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 

file schema:     hive_schema 
--------------------------------------------------------------------------------
udids:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
dfpts:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
uuids:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
uri:             OPTIONAL BINARY L:STRING R:0 D:1

row group 1:     RC:156 TS:1905578 OFFSET:4 
--------------------------------------------------------------------------------
udids:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:4 SZ:421990/662241/1.57 VC:60185 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 58, min/max not defined]
dfpts:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:421994 SZ:667849/904992/1.36 VC:161475 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 53, min/max not defined]
uuids:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:1089843 SZ:210072/308759/1.47 VC:39255 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 32, min/max not defined]
uri:              BINARY GZIP DO:0 FPO:1299915 SZ:5397/29586/5.48 VC:156 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]

file:        file:/Users/jh01792/Downloads/spark.parquet 
creator:     parquet-mr version 1.8.3 (build aef7230e114214b7cc962a8f3fc5aeed6ce80828) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"uri","type":"string","nullable":false,"metadata":{}},{"name":"dfpts","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"udids","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"uuids","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
uri:         REQUIRED BINARY L:STRING R:0 D:0
dfpts:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3
udids:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3
uuids:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3

row group 1: RC:11 TS:1943008 OFFSET:4 
--------------------------------------------------------------------------------
uri:          BINARY GZIP DO:0 FPO:4 SZ:847/2530/2.99 VC:11 ENC:PLAIN,BIT_PACKED ST:[num_nulls: 0, min/max not defined]
dfpts:       
.list:       
..element:    BINARY GZIP DO:0 FPO:851 SZ:735561/1135861/1.54 VC:62643 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
udids:       
.list:       
..element:    BINARY GZIP DO:0 FPO:736412 SZ:335289/555989/1.66 VC:23323 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
uuids:       
.list:       
..element:    BINARY GZIP DO:0 FPO:1071701 SZ:160494/248628/1.55 VC:13305 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]

Ответы [ 2 ]

1 голос
/ 05 июля 2019

Возможно, вам следует сформулировать свой вопрос по-другому:

Почему сжатие данных Firehose более эффективно, чем данные Spark?

У вас есть несколько возможных объясненийэто в Parquet:

  • Количество элементов в различных значениях столбцов

    В дополнение к схеме сжатия, Parquet пытается использовать наиболее эффективную кодировку для ваших значений,Специально для BYTE_ARRAY он по умолчанию попытается использовать словарную кодировку, то есть сопоставить каждое отдельное значение BYTE_ARRAY с типом int, а затем просто сохранить целые числа в данных столбца (дополнительная информация здесь ).Если словарь становится слишком большим, он будет просто сохранять значения BYTE_ARRAY.

    Если ваш набор данных Firehose содержит намного меньшее разнообразие значений, чем ваш набор данных Spark, один из них может использовать эффективную кодировку словаря, а другойне.

  • Сортированные данные

    Сортированные данные обычно сжимаются намного лучше, чем несортированные данные, поэтому, если значения в столбцах Firehose отсортированы естественным образом (или по крайней меречаще повторяется), кодирование паркета и сжатие gzip обеспечат гораздо лучшую степень сжатия

  • Другой размер группы строк

    Паркет разбивает значения вгруппы строк с регулируемым размером (parquet.block.size config в Spark).Сжатие и кодирование применяются на уровне группы строк, поэтому чем больше группа строк, тем лучше сжатие, но, возможно, хуже кодирование (например, вы можете перейти от словарной кодировки к обычным значениям byte_array) и более высокие требования к памяти при чтении или записи.

Как узнать, что происходит в вашем случае?

Используйте parquet-tools , чтобы проверить подробные данные кодирования дляваши столбцы:

Например, в одном из моих наборов данных:

$ parquet-tools meta part-00015-6a77dcbe-3edd-4199-bff0-efda0f512d61.c000.snappy.parquet

...

row group 1:              RC:63076 TS:41391030 OFFSET:4
--------------------------------------------------------------------------------
options:
.list:
..element:                 BINARY SNAPPY DO:0 FPO:6042924 SZ:189370/341005/1,80 VC:269833 ENC:RLE,PLAIN_DICTIONARY ST:[no stats for this column]

...

row group 2:              RC:28499 TS:14806649 OFFSET:11648146
--------------------------------------------------------------------------------
options:
.list:
..element:                 BINARY SNAPPY DO:0 FPO:13565454 SZ:78631/169832/2,16 VC:144697 ENC:RLE,PLAIN_DICTIONARY ST:[no stats for this column]

Атрибут ENC в данных столбца дает вам кодировку, используемую для столбца (в данном случае DICTIONARY)Атрибут SZ дает вам compressed size/uncompressed size/compression ratio и VC количество закодированных значений.

В моем примере вы можете видеть, что степень сжатия немного лучше в группе строк 2, чем в группе строк 1 просто из-за данныхраспределение.

Обновление :

Просматривая статистику, предоставленную вами, вы можете увидеть, что столбец dfpts в вашем наборе данныхимеет средний размер закодированного значения 904992/161475 = 5,6 байта, тогда как версия искры имеет 1135861/62643 = 18,13 байта, даже несмотря на то, что оба кода имеют одинаковую кодировку словаря.Это, вероятно, означает, что RLE намного более эффективен в вашем наборе данных пожарного шланга, потому что у вас есть много повторяющихся значений или намного меньше различных значений.Если вы отсортируете столбец dfpts в искре перед сохранением в паркет, вы, вероятно, достигнете коэффициентов кодирования, аналогичных вашим данным пожарного шланга.

0 голосов
/ 05 июля 2019

Две вещи, которые я могу придумать, чем можно объяснить разницу.
1. Свойства паркета.
В Spark вы можете найти все свойства, связанные с паркетом, используя следующие фрагменты.
Если свойства были установлены с помощью конфигов Hadoop,

import scala.collection.JavaConverters._

// spark = SparkSsssion
spark.sparkContext.hadoopConfiguration.asScala.filter {
  x =>
    x.getKey.contains("parquet")
}.foreach(println)

Если свойства были установлены с помощью Spark (spark-defaults.conf, --conf и т. Д.)

spark.sparkContext.getConf.getAll.filter {
  case(key, value) => key.contains("parquet")
}.foreach(println)

Если мы сможем получить конфиги пожарных шлангов (которые я не знаю), мы могли бы сделать сравнение. Иначе также конфиги должны дать общее представление о том, что может быть не так.
2. Разница в версиях паркета между Spark и FireHose.
Сообщество Parquet могло изменить настройки по умолчанию для паркетных конфигов между версиями.

...