Увеличение параллелизма записи с паркетом в улье - PullRequest
0 голосов
/ 10 ноября 2019

tl; dr - я записываю много данных в новую таблицу формата Parquet в Hive, но в задании используется гораздо меньше редукторов, чем указано, поэтому запись занимает гораздо больше времени, чем хотелось бы.

Я создаю таблицу озера данных, предназначенную для быстрого чтения, с помощью Spark, но я записываю данные с помощью куста, чтобы а) таблицы с корзинами можно было прочитать с помощью улья, и б) чтобы я мог записывать статистику в метасторское хранилище улья.

Я создаю таблицу из python следующим образом:

hivecur.execute("set hive.cbo.enable=true")
hivecur.execute("set hive.compute.query.using.stats=true")
hivecur.execute("set hive.stats.fetch.column.stats=true")
hivecur.execute("set hive.stats.fetch.partition.stats=true")

hivecur.execute("set hive.vectorized.execution.enabled=true")
hivecur.execute("set hive.vectorized.execution.reduce.enabled=true")

hivecur.execute('set mapred.reduce.tasks=100')

hivecur.execute(f"set dfs.block.size={1024*1024*128}")
hivecur.execute(f"set parquet.block.size={1024*1024*128}")

hivecur.execute(f"drop table if exists {TABLE_NAME}")

table_create_qry = f"""
create table {TABLE_NAME} (
    {schema.dice}
)
partitioned by (process_date_z int, dataset string)
clustered by (id) sorted by (source_id, type, id) into 200 buckets
stored as parquet
TBLPROPERTIES ("comment" = "{git_hash()}",
               "parquet.compress" = "snappy")

А потом, когда вставляю:

qry = f"""
        insert overwrite table {TABLE_NAME} partition (process_date_z, dataset)
        select ...
            source_id,
            process_date_z,
            '{dataset}' as dataset
        from {source_table}
        where process_date_z = {d}
        and pmod(hash(id),100) in ({",".join([str(x) for x in id_filters])})"""

Установив mapred.reduce.tasks=100, я надеялся, чтозаставить каждый раздел содержать 100 файлов отдавать или брать. Вместо этого, хотя создано 100 задач, 92 завершают работу очень быстро, а восемь задач сокращения работают дольше, записывая файлы с небольшим количеством десятков (но не 100) примерно одинакового размера.

Проблема с этим заключается в том, что сокращение является существенным узким местом в работе. процесс записи. Какой параметр я могу установить для улучшения производительности?

1 Ответ

0 голосов
/ 12 ноября 2019

Думаю, моя проблема возникла из-за глупого выбора хэш-функции.

Я подозреваю, что алгоритм, используемый для группирования по идентификатору, был тем же хешем, который я использовал для подстановки идентификаторов, и поэтому он создал корзину для всех возможных входных идентификаторов, но pmod WHERE позволил ему заполнить только несколько.

Чтобы решить эту проблему, я переключил хеш внутри pmod с помощью Murmurhash3 UDF из кирпичного дома.

https://github.com/klout/brickhouse

...