Вставка искры в пространство кучи Java - PullRequest
0 голосов
/ 22 февраля 2019

При запуске моего приложения scala-spark я получаю следующее исключение:

19/02/22 11:27:48 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, localhost): java.lang.OutOfMemoryError: Java heap space
at parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
at parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
at parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
at parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229)
at parquet.column.values.dictionary.DictionaryValuesWriter$PlainFixedLenArrayDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:305)
at parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:137)
at parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:179)
at parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
at parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:84)
at parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
at parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:309)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267)
at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:65)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:246)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.org$apache$spark$sql$hive$SparkHiveDynamicPartitionWriterContainer$$newWriter$1(hiveWriterContainers.scala:240)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.getLocalFileWriter(hiveWriterContainers.scala:249)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:112)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)

Приложение пытается вставить фрейм данных из таблицы DB2 (JdbcConnection) в секционированную таблицу Hive parquet.

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hiveContext.setConf("parquet.block.size","50000000")
hiveContext.setConf("parquet.memory.pool.ratio", "0.2")

val dfPartitionString = data.withColumn(partitionColumnName, data(partitionColumnName).cast(StringType))

val columnNames = dfPartitionString.columns
var dfRenamed = dfPartitionString
for (columnName <- columnNames){
  dfRenamed = dfRenamed.withColumnRenamed(columnName, columnName.toLowerCase)
}

dfRenamed.write.mode("append").partitionBy(partitionColumnName).insertInto(hiveTableFullName)

Spark-версия 1.6.1.Я запускаю приложение с помощью следующей команды:

spark-submit --jars $JAR_CRASHACC --class myAppClass myApp.jar application.conf --master yarn --deploy-mode cluster --driver-memory 20g --executor-memory 50g --num-executors 3 --executor-cores 3 \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:config/log4j.properties -DLOG_PATH=/logs/ -DTMS_LANCIO=20190222115433 spark.driver.extraJavaOptions=-Xmx2000m spark.driver.maxResultSize=4g" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG4GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError" --conf spark.sql.sources.maxConcurrentWrites=10 --conf spark.eventLog.enabled=true

1 Ответ

0 голосов
/ 25 февраля 2019

Как описано в предлагаемых ссылках, проблема была связана с памятью кучи Java, зарезервированной для драйвера / исполнителей.В любом случае, как сообщалось в моей команде выполнения, я уже увеличил эти значения до 20 г и 50 г.

Я смог решить, добавив параметр --verbose в свою команду spark-submit: таким образом я заметил, чтомои параметры не были правильно проанализированы искрой:

Parsed arguments:
  master                  local[*]
  deployMode              null
  executorMemory          null
  executorCores           null
  totalExecutorCores      null
  propertiesFile          /usr/iop/4.2.0.0/spark/conf/spark-defaults.conf
  driverMemory            null

Решение было экспортировать следующий путь среды:

export SPARK_DRIVER_MEMORY=8g
export SPARK_EXECUTOR_MEMORY=16g

После этого подробный вывод был следующим:

Parsed arguments:
  master                  local[*]
  deployMode              null
  executorMemory          16g
  executorCores           null
  totalExecutorCores      null
  propertiesFile          /usr/iop/4.2.0.0/spark/conf/spark-defaults.conf
  driverMemory            8g
...