Почему происходит сбой DataFrame.stat.approxQuantile, поскольку размер сериализованных результатов n задач (1030,8 МБ) больше, чем spark.driver.maxResultSize - PullRequest
0 голосов
/ 02 июня 2019

val postsQuantiles = posts.stat.approxQuantile("_score", Array(0.25, 0.75), 0.0) сбой со следующей ошибкой. Очевидно, я могу установить spark.driver.maxResultSize, чтобы обойти эту ошибку, но мне любопытно, зачем это собирать данные для драйвера?

[Stage 3:==================>                                      (7 + 15) / 22]19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 18 tasks (1030.8 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 19 tasks (1087.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 20 tasks (1145.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 21 tasks (1203.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 22 tasks (1261.4 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
[Stage 3:====================================>                    (14 + 8) / 22]org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 18 tasks (1030.8 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131)
  at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
  at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
  at org.apache.spark.sql.execution.stat.StatFunctions$.multipleApproxQuantiles(StatFunctions.scala:102)
  at org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:100)
  at org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:75)
  ... 56 elided

1 Ответ

3 голосов
/ 02 июня 2019

Метод approxQuantile следует алгоритму Гринвальда-Кханна для вычисления приближенного квантиля (на основе их документации https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions).. Он позволяет вам выбрать относительную погрешность.

В документации они предупреждают вас, что выбор относительной ошибки 0.0 (как у вас есть) может быть очень дорогим, и это именно то, что вы видите. Алгоритм больше подходит для приближенного квантиля, чем для прямого квантиля. Причина, по которой извлекается так много данных, заключается в том, что для вычисления прямого квантиля необходимо как минимум вытянуть все данные из столбца в драйвер.

Подробнее об алгоритме можно узнать из опубликованной статьи: http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf

Чтобы преодолеть это, я бы предложил использовать термин относительной ошибки с некоторым подходящим небольшим значением, что дает вам уверенность " достаточно близкую ".

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...