Избегайте одного файла с параметром hive.optimize.sort.dynamic.partition - PullRequest
0 голосов
/ 21 октября 2018

Я использую hive.

Когда я пишу динамические разделы с помощью запроса INSERT и включаю параметр hive.optimize.sort.dynamic.partition (SET hive.optimize.sort.dynamic.partition=true), в каждом разделе всегда есть один файл.

Но если я отключу эту опцию (SET hive.optimize.sort.dynamic.partition=false), я получу исключение из памяти, как это.

TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) : attempt_1534502930145_6994_1_01_000008_3:java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
        at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:194)
        at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)
        at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
        at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
        at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
        at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
        at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
        at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229)
        at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131)
        at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
        at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
        at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
        at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
        at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:184)
        at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:376)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
        at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:327)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288)
        at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:67)
        at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:128)
        at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:117)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:286)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:271)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:619)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:563)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createNewPaths(FileSinkOperator.java:867)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:975)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:715)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:356)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:287)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.run(ReduceRecordProcessor.java:317)
]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:299, Vertex vertex_1534502930145_6994_1_01 [Reducer 2] killed/failed due to:OWN_TASK_FAILURE]Vertex killed, vertexName=Map 1, vertexId=vertex_1534502930145_6994_1_00, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to OTHER_VERTEX_FAILURE, failedTasks:0 killedTasks:27, Vertex vertex_1534502930145_6994_1_00 [Map 1] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1

Полагаю, это исключение возникло, потому что редуктор записывает во многие разделы одновременно,Но я не могу найти, как это контролировать.И я следовал этой статье , но она мне не помогает.

Моя среда такова:

  • AWS EMR 5.12.1
  • Используйте tez в качестве движка исполнения
  • версия куста - 2.3.2, а версия tez - 0.8.2
  • Размер блока HDFS - 128 МБ
  • Существует около 30 динамических разделов длянаписать запрос INSERT

Вот мой пример запроса.

SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.optimize.sort.dynamic.partition=true;
SET hive.exec.reducers.bytes.per.reducer=1048576;
SET mapred.reduce.tasks=300;
FROM raw_data
INSERT OVERWRITE TABLE idw_data
  PARTITION(event_timestamp_date)
  SELECT
    *
  WHERE 
    event_timestamp_date BETWEEN '2018-09-09' AND '2018-10-09' 
DISTRIBUTE BY event_timestamp_date
;

Ответы [ 2 ]

0 голосов
/ 24 октября 2018

Наконец-то я понял, что не так.

Прежде всего, механизм исполнения был tez.mapreduce.reduce.memory.mb вариант не помог.Вы должны использовать опцию hive.tez.container.size.При записи динамического раздела, редуктор открывает несколько записей.Редуктору требуется достаточно памяти для одновременной записи нескольких разделов.

Если вы используете опцию hive.optimize.sort.dynamic.partition, глобальная сортировка разделов выполняется, но сортировка означает, что существуют редукторы.В этом случае, если нет других задач редуктора, каждый раздел обрабатывается одним редуктором.Вот почему в разделе есть только один файл.DISTRIBUTE BY делает больше задач сокращения, поэтому он может создавать больше файлов в каждом разделе, но при этом возникает та же проблема с памятью.

Следовательно, размер памяти контейнеров действительно важен!Не забудьте использовать опцию hive.tez.container.size, чтобы изменить размер памяти контейнера tez!

0 голосов
/ 21 октября 2018

distribute by partition key помогает с проблемой OOM, но эта конфигурация может привести к тому, что каждый редуктор записывает весь раздел, в зависимости от конфигурации hive.exec.reducers.bytes.per.reducer, для которой по умолчанию может быть установлено очень высокое значение, например, 1 ГБ.distribute by partition key может вызвать дополнительную стадию уменьшения, то же самое происходит hive.optimize.sort.dynamic.partition.

Итак, чтобы избежать OOM и достичь максимальной производительности:

  1. добавьте distribute by partition key в конце вашего запроса вставки, это приведет к тому, что одни и те же ключи раздела будут обрабатываться одним и тем жередуктор (ы).В качестве альтернативы или в дополнение к этому параметру вы можете использовать hive.optimize.sort.dynamic.partition=true
  2. и установить hive.exec.reducers.bytes.per.reducer в значение, которое вызовет большее количество редукторов, если в одном разделе слишком много данных.Просто проверьте текущее значение hive.exec.reducers.bytes.per.reducer и уменьшите или увеличьте его соответствующим образом, чтобы получить правильный параллелизм редуктора.Этот параметр определяет, сколько данных будет обрабатывать один редуктор и сколько файлов будет создано для каждого раздела.

Пример:

set hive.exec.reducers.bytes.per.reducer=33554432;

insert overwrite table partition (load_date)
select * from src_table
distribute by load_date;

См. Также этот ответ об управлении количествомкартографы и редукторы: https://stackoverflow.com/a/42842117/2700344

...