Почему простой расчет - без каких-либо агрегатов или объединений - приводит к появлению OOME в Spark SQL? - PullRequest
0 голосов
/ 30 ноября 2018

Учитывая, что умеренно сложный sql - который выполняет агрегацию (collect_list и set_of_strings), а также join - не ошибся и фактически работает хорошо (быстро):

select size(set_of_strings(tps)) tpss, size(tps) tps, tp,
          size(set_of_strings(fps)) fpss, size(fps) fps, fp,
          size(set_of_strings(fns)) fnss, size(fns) fns, fn from
        (select collect_list(accuracy.tpId) tps, sum(accuracy.tp) tp,
          collect_list(accuracy.fpId) fps, sum(accuracy.fp) fp,
          collect_list(accuracy.fnId) fns, sum(accuracy.fn) fn /*, a.* */ from
            (select t.src, s.dst as s_dst, explode(accuracy_intersection(t.grouped_entities, contactSrcIds)) accuracy, t.grouped_entities, s.contactSrcIds
              from
                (select explode(grouped_entities) src, grouped_entities from truth) t
               join
               (select dst, set_of_strings(flatten(collect_list(dstIds))) contactSrcIds from matches group by dst) s
              on t.src = s.dst
            ) a
        )

Обратите внимание, что set_of_strings и accuracy_intersection являются пользовательскими UDFS.Учитывая, что все вышеперечисленное работает нормально (и они впоследствии не используются), не важно обсуждать их дальше.

Теперь проблема заключается в следующем запросе, который использует результаты этого первого запроса - которыйотображается как таблица accuracy1.Запрос очень прост:

select cast(tpss as double)/(cast(tpss as double)+cast(fpss as double)) accuracy_s,
            cast(tpss as double)/(cast(tpss as double)+cast(fnss as double)) recall_s, tpss, tp, fpss, fp, fnss, fn from accuracy1

Обратите внимание, что нет ни агрегатов, ни объединений.Это простое арифметическое вычисление поверх полей из одной строки. Я ожидал бы, что эта однострочная операция будет простой и высокопроизводительной для spark-sql.

Вместо этого sql постоянно выдает ошибку с OOME.Я не могу понять, что может привести к тому, что это произойдет: здесь нет расширений данных - это всего лишь (скромное количество) процессорная обработка.

Подробности первой обработки запроса

  • Производительность: 19,5 секунд для строк 2Meg:

    18/11/30 11:25:33 INFO MetricUtils $: [11-30 11:25:33] -метрика (.sbg-graph-pipe. .accuracy1_all-SQL для precision1-duration): 19,573сек

  • Типы данных для результата первого запроса при его записи в parquet:

     18/11/30 11:25:33 DEBUG ParquetStorage$:  Schema of data to write: root
      |-- tpss: integer (nullable = false)
      |-- tps: integer (nullable = false)
      |-- tp: long (nullable = true)
      |-- fpss: integer (nullable = false)
      |-- fps: integer (nullable = false)
      |-- fp: long (nullable = true)
      |-- fnss: integer (nullable = false)
      |-- fns: integer (nullable = false)
      |-- fn: long (nullable = true)

Исключение, сгенерированное во втором запросе

18/11/30 11:28:02 INFO MetricUtils$: accuracy2_name: [select cast(tpss as double)/(cast(tpss as double)+cast(fpss as double)) accuracy_s,
            cast(tpss as double)/(cast(tpss as double)+cast(fnss as double)) recall_s, tpss, tp, fpss, fp, fnss, fn from accuracy1]
18/11/30 11:28:02 INFO MetricUtils$: [11-30 11:28:02] --metric(sbg-graph-pipeline. .accuracy2_name-SQL for accuracy-start): Fri Nov 30 11:28:02 PST 2018 true
18/11/30 11:28:02 INFO MetricUtils$: [11-30 11:28:02] --metric( .sbg-graph-pipeline. .accuracy2_name-SQL for accuracy-duration): 0.02secs
18/11/30 11:31:43 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.collection.mutable.LinkedHashMap.createNewEntry(LinkedHashMap.scala:155)
    at scala.collection.mutable.LinkedHashMap.createNewEntry(LinkedHashMap.scala:49)
    at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.LinkedHashMap.findOrAddEntry(LinkedHashMap.scala:49)
    at scala.collection.mutable.LinkedHashMap.put(LinkedHashMap.scala:71)
    at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:91)
    at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:49)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
    at scala.collection.mutable.LinkedHashMap.foreach(LinkedHashMap.scala:141)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.AbstractMap.$plus$plus$eq(Map.scala:80)
    at scala.collection.mutable.MapLike$class.clone(MapLike.scala:223)
    at scala.collection.mutable.AbstractMap.clone(Map.scala:80)
    at scala.collection.mutable.MapLike$class.$plus$plus(MapLike.scala:143)
    at scala.collection.mutable.AbstractMap.$plus$plus(Map.scala:80)
    at org.apache.spark.executor.TaskMetrics.nameToAccums$lzycompute(TaskMetrics.scala:235)
    at org.apache.spark.executor.TaskMetrics.nameToAccums(TaskMetrics.scala:235)
    at org.apache.spark.executor.TaskMetrics$$anonfun$fromAccumulatorInfos$2.apply(TaskMetrics.scala:302)
    at org.apache.spark.executor.TaskMetrics$$anonfun$fromAccumulatorInfos$2.apply(TaskMetrics.scala:296)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.executor.TaskMetrics$.fromAccumulatorInfos(TaskMetrics.scala:296)
    at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1$$anonfun$apply$11.apply(AppStatusListener.scala:606)
    at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1$$anonfun$apply$11.apply(AppStatusListener.scala:605)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1.apply(AppStatusListener.scala:605)
    at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1.apply(AppStatusListener.scala:604)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
18/11/30 11:32:00 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-appStatus
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.collection.mutable.LinkedHashMap.createNewEntry(LinkedHashMap.scala:155)
    at scala.collection.mutable.LinkedHashMap.createNewEntry(LinkedHashMap.scala:49)
    at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.LinkedHashMap.findOrAddEntry(LinkedHashMap.scala:49)
    at scala.collection.mutable.LinkedHashMap.put(LinkedHashMap.scala:71)
    at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:91)
    at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:49)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
    at scala.collection.mutable.LinkedHashMap.foreach(LinkedHashMap.scala:141)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.AbstractMap.$plus$plus$eq(Map.scala:80)
    at scala.collection.mutable.MapLike$class.clone(MapLike.scala:223)
    at scala.collection.mutable.AbstractMap.clone(Map.scala:80)
    at scala.collection.mutable.MapLike$class.$plus$plus(MapLike.scala:143)
    at scala.collection.mutable.AbstractMap.$plus$plus(Map.scala:80)
    at org.apache.spark.executor.TaskMetrics.nameToAccums$lzycompute(TaskMetrics.scala:235)
    at org.apache.spark.executor.TaskMetrics.nameToAccums(TaskMetrics.scala:235)
    at org.apache.spark.executor.TaskMetrics$$anonfun$fromAccumulatorInfos$2.apply(TaskMetrics.scala:302)
    at org.apache.spark.executor.TaskMetrics$$anonfun$fromAccumulatorInfos$2.apply(TaskMetrics.scala:296)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.executor.TaskMetrics$.fromAccumulatorInfos(TaskMetrics.scala:296)
    at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1$$anonfun$apply$11.apply(AppStatusListener.scala:606)
    at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1$$anonfun$apply$11.apply(AppStatusListener.scala:605)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1.apply(AppStatusListener.scala:605)
    at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1.apply(AppStatusListener.scala:604)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
Exception in thread "spark-listener-group-appStatus" java.lang.OutOfMemoryError: GC overhead limit exceeded

Примечание: этот запрос выполняется в локальном [*] искровой экземпляр: поэтому память распределяется между драйвером и исполнителями .

Обновление Ошибка возникает в Spark Исходном коде TaskMetrics.scala при добавлении новой метрики здесь:

  import InternalAccumulator._
  @transient private[spark] lazy val nameToAccums = LinkedHashMap(
    EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
    EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
    EXECUTOR_RUN_TIME -> _executorRunTime,
    EXECUTOR_CPU_TIME -> _executorCpuTime,
    RESULT_SIZE -> _resultSize,
    JVM_GC_TIME -> _jvmGCTime,
    RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
    MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
    DISK_BYTES_SPILLED -> _diskBytesSpilled,
    PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
    UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
    shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
    shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
    shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
    shuffleRead.REMOTE_BYTES_READ_TO_DISK -> shuffleReadMetrics._remoteBytesReadToDisk,
    shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
    shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
    shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
    shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
    shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
    shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
    input.BYTES_READ -> inputMetrics._bytesRead,
    input.RECORDS_READ -> inputMetrics._recordsRead,
    output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
    output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
  ) ++ testAccum.map(TEST_ACCUM -> _)

Что я не понимаю, так эточто количество записей, обработанных в предыдущем запросе, который работал нормально, совпадает с количеством записей в этом запросе, который завершается неудачно.Нужно ли unpersist результаты предыдущего запроса, чтобы освободить память для метрик внутренних свечей?Я попробую это ..

Другое обновление : По предложению @ user6910411 я рассмотрел некоторые параметры sparkConf.Была предпринята попытка сделать следующее:

.set("spark.driver.memory", "2g")               // default is 1G
.set("spark.memory.fraction","0.4")             // default is 0.6
.set("spark.memory.storageFraction","0.3")      // default is 0.5

Это не повлияло на результат: все еще есть OOME.

Пока работает только один подход - выполнить два запроса в отдельных заданиях спарка.В этом случае второй запрос, который получает OOME, прекрасно работает менее чем за две секунды.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...