Учитывая, что умеренно сложный sql - который выполняет агрегацию (collect_list
и set_of_strings
), а также join
- не ошибся и фактически работает хорошо (быстро):
select size(set_of_strings(tps)) tpss, size(tps) tps, tp,
size(set_of_strings(fps)) fpss, size(fps) fps, fp,
size(set_of_strings(fns)) fnss, size(fns) fns, fn from
(select collect_list(accuracy.tpId) tps, sum(accuracy.tp) tp,
collect_list(accuracy.fpId) fps, sum(accuracy.fp) fp,
collect_list(accuracy.fnId) fns, sum(accuracy.fn) fn /*, a.* */ from
(select t.src, s.dst as s_dst, explode(accuracy_intersection(t.grouped_entities, contactSrcIds)) accuracy, t.grouped_entities, s.contactSrcIds
from
(select explode(grouped_entities) src, grouped_entities from truth) t
join
(select dst, set_of_strings(flatten(collect_list(dstIds))) contactSrcIds from matches group by dst) s
on t.src = s.dst
) a
)
Обратите внимание, что set_of_strings
и accuracy_intersection
являются пользовательскими UDFS
.Учитывая, что все вышеперечисленное работает нормально (и они впоследствии не используются), не важно обсуждать их дальше.
Теперь проблема заключается в следующем запросе, который использует результаты этого первого запроса - которыйотображается как таблица accuracy1
.Запрос очень прост:
select cast(tpss as double)/(cast(tpss as double)+cast(fpss as double)) accuracy_s,
cast(tpss as double)/(cast(tpss as double)+cast(fnss as double)) recall_s, tpss, tp, fpss, fp, fnss, fn from accuracy1
Обратите внимание, что нет ни агрегатов, ни объединений.Это простое арифметическое вычисление поверх полей из одной строки. Я ожидал бы, что эта однострочная операция будет простой и высокопроизводительной для spark-sql
.
Вместо этого sql постоянно выдает ошибку с OOME
.Я не могу понять, что может привести к тому, что это произойдет: здесь нет расширений данных - это всего лишь (скромное количество) процессорная обработка.
Подробности первой обработки запроса
Производительность: 19,5 секунд для строк 2Meg:
18/11/30 11:25:33 INFO MetricUtils $: [11-30 11:25:33] -метрика (.sbg-graph-pipe. .accuracy1_all-SQL для precision1-duration): 19,573сек
Типы данных для результата первого запроса при его записи в parquet
:
18/11/30 11:25:33 DEBUG ParquetStorage$: Schema of data to write: root
|-- tpss: integer (nullable = false)
|-- tps: integer (nullable = false)
|-- tp: long (nullable = true)
|-- fpss: integer (nullable = false)
|-- fps: integer (nullable = false)
|-- fp: long (nullable = true)
|-- fnss: integer (nullable = false)
|-- fns: integer (nullable = false)
|-- fn: long (nullable = true)
Исключение, сгенерированное во втором запросе
18/11/30 11:28:02 INFO MetricUtils$: accuracy2_name: [select cast(tpss as double)/(cast(tpss as double)+cast(fpss as double)) accuracy_s,
cast(tpss as double)/(cast(tpss as double)+cast(fnss as double)) recall_s, tpss, tp, fpss, fp, fnss, fn from accuracy1]
18/11/30 11:28:02 INFO MetricUtils$: [11-30 11:28:02] --metric(sbg-graph-pipeline. .accuracy2_name-SQL for accuracy-start): Fri Nov 30 11:28:02 PST 2018 true
18/11/30 11:28:02 INFO MetricUtils$: [11-30 11:28:02] --metric( .sbg-graph-pipeline. .accuracy2_name-SQL for accuracy-duration): 0.02secs
18/11/30 11:31:43 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.mutable.LinkedHashMap.createNewEntry(LinkedHashMap.scala:155)
at scala.collection.mutable.LinkedHashMap.createNewEntry(LinkedHashMap.scala:49)
at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:169)
at scala.collection.mutable.LinkedHashMap.findOrAddEntry(LinkedHashMap.scala:49)
at scala.collection.mutable.LinkedHashMap.put(LinkedHashMap.scala:71)
at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:91)
at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:49)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
at scala.collection.mutable.LinkedHashMap.foreach(LinkedHashMap.scala:141)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.AbstractMap.$plus$plus$eq(Map.scala:80)
at scala.collection.mutable.MapLike$class.clone(MapLike.scala:223)
at scala.collection.mutable.AbstractMap.clone(Map.scala:80)
at scala.collection.mutable.MapLike$class.$plus$plus(MapLike.scala:143)
at scala.collection.mutable.AbstractMap.$plus$plus(Map.scala:80)
at org.apache.spark.executor.TaskMetrics.nameToAccums$lzycompute(TaskMetrics.scala:235)
at org.apache.spark.executor.TaskMetrics.nameToAccums(TaskMetrics.scala:235)
at org.apache.spark.executor.TaskMetrics$$anonfun$fromAccumulatorInfos$2.apply(TaskMetrics.scala:302)
at org.apache.spark.executor.TaskMetrics$$anonfun$fromAccumulatorInfos$2.apply(TaskMetrics.scala:296)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.TaskMetrics$.fromAccumulatorInfos(TaskMetrics.scala:296)
at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1$$anonfun$apply$11.apply(AppStatusListener.scala:606)
at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1$$anonfun$apply$11.apply(AppStatusListener.scala:605)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1.apply(AppStatusListener.scala:605)
at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1.apply(AppStatusListener.scala:604)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
18/11/30 11:32:00 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-appStatus
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.mutable.LinkedHashMap.createNewEntry(LinkedHashMap.scala:155)
at scala.collection.mutable.LinkedHashMap.createNewEntry(LinkedHashMap.scala:49)
at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:169)
at scala.collection.mutable.LinkedHashMap.findOrAddEntry(LinkedHashMap.scala:49)
at scala.collection.mutable.LinkedHashMap.put(LinkedHashMap.scala:71)
at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:91)
at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:49)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
at scala.collection.mutable.LinkedHashMap.foreach(LinkedHashMap.scala:141)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.AbstractMap.$plus$plus$eq(Map.scala:80)
at scala.collection.mutable.MapLike$class.clone(MapLike.scala:223)
at scala.collection.mutable.AbstractMap.clone(Map.scala:80)
at scala.collection.mutable.MapLike$class.$plus$plus(MapLike.scala:143)
at scala.collection.mutable.AbstractMap.$plus$plus(Map.scala:80)
at org.apache.spark.executor.TaskMetrics.nameToAccums$lzycompute(TaskMetrics.scala:235)
at org.apache.spark.executor.TaskMetrics.nameToAccums(TaskMetrics.scala:235)
at org.apache.spark.executor.TaskMetrics$$anonfun$fromAccumulatorInfos$2.apply(TaskMetrics.scala:302)
at org.apache.spark.executor.TaskMetrics$$anonfun$fromAccumulatorInfos$2.apply(TaskMetrics.scala:296)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.TaskMetrics$.fromAccumulatorInfos(TaskMetrics.scala:296)
at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1$$anonfun$apply$11.apply(AppStatusListener.scala:606)
at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1$$anonfun$apply$11.apply(AppStatusListener.scala:605)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1.apply(AppStatusListener.scala:605)
at org.apache.spark.status.AppStatusListener$$anonfun$onExecutorMetricsUpdate$1.apply(AppStatusListener.scala:604)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
Exception in thread "spark-listener-group-appStatus" java.lang.OutOfMemoryError: GC overhead limit exceeded
Примечание: этот запрос выполняется в локальном [*] искровой экземпляр: поэтому память распределяется между драйвером и исполнителями .
Обновление Ошибка возникает в Spark
Исходном коде TaskMetrics.scala
при добавлении новой метрики здесь:
import InternalAccumulator._
@transient private[spark] lazy val nameToAccums = LinkedHashMap(
EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
EXECUTOR_RUN_TIME -> _executorRunTime,
EXECUTOR_CPU_TIME -> _executorCpuTime,
RESULT_SIZE -> _resultSize,
JVM_GC_TIME -> _jvmGCTime,
RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
DISK_BYTES_SPILLED -> _diskBytesSpilled,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
shuffleRead.REMOTE_BYTES_READ_TO_DISK -> shuffleReadMetrics._remoteBytesReadToDisk,
shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
input.BYTES_READ -> inputMetrics._bytesRead,
input.RECORDS_READ -> inputMetrics._recordsRead,
output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
) ++ testAccum.map(TEST_ACCUM -> _)
Что я не понимаю, так эточто количество записей, обработанных в предыдущем запросе, который работал нормально, совпадает с количеством записей в этом запросе, который завершается неудачно.Нужно ли unpersist
результаты предыдущего запроса, чтобы освободить память для метрик внутренних свечей?Я попробую это ..
Другое обновление : По предложению @ user6910411 я рассмотрел некоторые параметры sparkConf
.Была предпринята попытка сделать следующее:
.set("spark.driver.memory", "2g") // default is 1G
.set("spark.memory.fraction","0.4") // default is 0.6
.set("spark.memory.storageFraction","0.3") // default is 0.5
Это не повлияло на результат: все еще есть OOME.
Пока работает только один подход - выполнить два запроса в отдельных заданиях спарка.В этом случае второй запрос, который получает OOME, прекрасно работает менее чем за две секунды.