Как установить размер группы строк для файлов в формате hdf? - PullRequest
0 голосов
/ 15 ноября 2018

Я провожу несколько экспериментов с размером блока (dfs.block.size) и размером группы строк (parquet.block.size) в hdfs.

У меня большой набор данных в формате hdf, и яхотите скопировать данные с различными размерами блоков и групп строк для тестирования.Я могу скопировать данные с другим размером блока, используя:

hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M

Но изменяется только dfs.block.size.Я проверяю hdfs dfs -stat для размера блока и parquet-tools meta для размера группы строк.Фактически, если я заменю parquet.block.size на blah.blah.blah, это будет иметь тот же эффект.Я даже вошел в spark-shell и установил свойство parquet.block.size вручную, используя

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).

Я использую hadoop 3.1.0.Я получил имя свойства parquet.block.size от здесь .

Вот первые 10 строк вывода моей попытки

row group 1:                    RC:4140100 TS:150147503 OFFSET:4
row group 2:                    RC:3520100 TS:158294646 OFFSET:59176084
row group 3:                    RC:880100 TS:80122359 OFFSET:119985867
row group 4:                    RC:583579 TS:197303521 OFFSET:149394540
row group 5:                    RC:585594 TS:194850776 OFFSET:213638039
row group 6:                    RC:2620100 TS:130170698 OFFSET:277223867
row group 7:                    RC:2750100 TS:136761819 OFFSET:332088066
row group 8:                    RC:1790100 TS:86766854 OFFSET:389772650
row group 9:                    RC:2620100 TS:125876377 OFFSET:428147454
row group 10:                   RC:1700100 TS:83791047 OFFSET:483600973

Как вы можете видеть,TS (общий размер) намного больше 64 МБ (67108864 байта)

Моя текущая теория:

Я делаю это в Spark-Shell:

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")

Таквозможно, это потому, что мои входные данные уже имеют 1034 раздела.Я действительно не уверен.Мои данные содержат около 118 столбцов в строке.

1 Ответ

0 голосов
/ 19 ноября 2018

Свойство parquet.block.size влияет только на авторов паркета.С другой стороны, команда hdfs dfs -cp копирует файлы независимо от их содержимого.Поэтому свойство parquet.block.size игнорируется hdfs dfs -cp.

Представьте, что у вас есть приложение, которое делает снимки экрана в формате JPG или PNG, в зависимости от файла конфигурации.Вы делаете копию этих скриншотов с помощью команды cp.Естественно, даже если вы измените желаемый формат изображения в файле конфигурации, команда cp всегда будет создавать выходные файлы в формате изображения исходных файлов, независимо от файла конфигурации.Файл конфигурации используется только приложением, создающим снимок экрана, а не cp.Вот как работает свойство parquet.block.size.

Чтобы изменить размер блока, нужно переписать файл.Вы упомянули, что у вас есть spark-shell.Используйте это, чтобы переписать файл Parquet, выполнив

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")

Обновление : так как вы упомянули в комментариях ниже, что он не работает для вас, я провел эксперимент и опубликовал стенограмму сеансаниже:

$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1:  RC:4291 TS:5004800 OFFSET:4
row group 2:  RC:3854 TS:4499360 OFFSET:5004804
row group 3:  RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1:   RC:169 TS:202080 OFFSET:4
row group 2:   RC:168 TS:201760 OFFSET:190164
row group 3:   RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1:   RC:254 TS:302720 OFFSET:4
row group 2:   RC:255 TS:303280 OFFSET:284004
row group 3:   RC:263 TS:303200 OFFSET:568884

Посмотрев на значения TS, вы увидите, что входной файл имеет размер группы строк 4,5-5M, а выходные файлы имеют размеры групп строк 200K и 300K соответственно.Это показывает, что значение, установленное с помощью sc.hadoopConfiguration, становится «по умолчанию», в то время как другой метод, упомянутый в комментарии ниже, включающий df.options, переопределяет это значение по умолчанию.

Обновление 2 : сейчасчто вы опубликовали свой вывод, я вижу, что происходит.В вашем случае происходит сжатие, увеличивая объем данных, которые будут помещаться в группы строк.Размер группы строк применяется к сжатым данным, но TS показывает размер несжатых данных.Однако вы можете определить размер групп строк, вычтя их начальные смещения.Например, сжатый размер вашей первой группы строк составляет 59176084 - 4 = 59176080 байт или меньше (поскольку заполнение также может иметь место).Я скопировал ваши результаты в /tmp/rowgroups.dat на моем компьютере и рассчитал размеры вашей группы строк, введя следующую команду:

$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519

(Команда numinterval находится в пакете num-utils в Ubuntu.) Как видите, все ваши группы строк меньше указанного вами размера группы строк.(Причиной, по которой они не имеют точно указанного размера, является PARQUET-1337 .)

...